1 /* SPDX-License-Identifier: LGPL-2.1+ */
3 Copyright © 2012 Zbigniew Jędrzejewski-Szmek
11 #include <sys/prctl.h>
12 #include <sys/socket.h>
15 #include "sd-daemon.h"
17 #include "alloc-util.h"
21 #include "journal-file.h"
22 #include "journal-remote-write.h"
23 #include "journal-remote.h"
24 #include "journald-native.h"
26 #include "parse-util.h"
27 #include "process-util.h"
28 #include "socket-util.h"
29 #include "stdio-util.h"
30 #include "string-util.h"
33 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
35 #define filename_escape(s) xescape((s), "/ ")
37 static int open_output(RemoteServer
*s
, Writer
*w
, const char* host
) {
38 _cleanup_free_
char *_filename
= NULL
;
42 switch (s
->split_mode
) {
43 case JOURNAL_WRITE_SPLIT_NONE
:
47 case JOURNAL_WRITE_SPLIT_HOST
: {
48 _cleanup_free_
char *name
;
52 name
= filename_escape(host
);
56 r
= asprintf(&_filename
, "%s/remote-%s.journal", s
->output
, name
);
65 assert_not_reached("what?");
68 r
= journal_file_open_reliably(filename
,
70 s
->compress
, (uint64_t) -1, s
->seal
,
75 return log_error_errno(r
, "Failed to open output journal %s: %m", filename
);
77 log_debug("Opened output file %s", w
->journal
->path
);
81 /**********************************************************************
82 **********************************************************************
83 **********************************************************************/
85 static int init_writer_hashmap(RemoteServer
*s
) {
86 static const struct hash_ops
* const hash_ops
[] = {
87 [JOURNAL_WRITE_SPLIT_NONE
] = NULL
,
88 [JOURNAL_WRITE_SPLIT_HOST
] = &string_hash_ops
,
92 assert(s
->split_mode
>= 0 && s
->split_mode
< (int) ELEMENTSOF(hash_ops
));
94 s
->writers
= hashmap_new(hash_ops
[s
->split_mode
]);
101 int journal_remote_get_writer(RemoteServer
*s
, const char *host
, Writer
**writer
) {
102 _cleanup_(writer_unrefp
) Writer
*w
= NULL
;
106 switch(s
->split_mode
) {
107 case JOURNAL_WRITE_SPLIT_NONE
:
108 key
= "one and only";
111 case JOURNAL_WRITE_SPLIT_HOST
:
117 assert_not_reached("what split mode?");
120 w
= hashmap_get(s
->writers
, key
);
128 if (s
->split_mode
== JOURNAL_WRITE_SPLIT_HOST
) {
129 w
->hashmap_key
= strdup(key
);
134 r
= open_output(s
, w
, host
);
138 r
= hashmap_put(s
->writers
, w
->hashmap_key
?: key
, w
);
143 *writer
= TAKE_PTR(w
);
148 /**********************************************************************
149 **********************************************************************
150 **********************************************************************/
152 /* This should go away as soon as µhttpd allows state to be passed around. */
153 RemoteServer
*journal_remote_server_global
;
155 static int dispatch_raw_source_event(sd_event_source
*event
,
159 static int dispatch_raw_source_until_block(sd_event_source
*event
,
161 static int dispatch_blocking_source_event(sd_event_source
*event
,
163 static int dispatch_raw_connection_event(sd_event_source
*event
,
168 static int get_source_for_fd(RemoteServer
*s
,
169 int fd
, char *name
, RemoteSource
**source
) {
173 /* This takes ownership of name, but only on success. */
178 if (!GREEDY_REALLOC0(s
->sources
, s
->sources_size
, fd
+ 1))
181 r
= journal_remote_get_writer(s
, name
, &writer
);
183 return log_warning_errno(r
, "Failed to get writer for source %s: %m",
186 if (s
->sources
[fd
] == NULL
) {
187 s
->sources
[fd
] = source_new(fd
, false, name
, writer
);
188 if (!s
->sources
[fd
]) {
189 writer_unref(writer
);
196 *source
= s
->sources
[fd
];
200 static int remove_source(RemoteServer
*s
, int fd
) {
201 RemoteSource
*source
;
204 assert(fd
>= 0 && fd
< (ssize_t
) s
->sources_size
);
206 source
= s
->sources
[fd
];
208 /* this closes fd too */
210 s
->sources
[fd
] = NULL
;
217 int journal_remote_add_source(RemoteServer
*s
, int fd
, char* name
, bool own_name
) {
218 RemoteSource
*source
= NULL
;
221 /* This takes ownership of name, even on failure, if own_name is true. */
233 r
= get_source_for_fd(s
, fd
, name
, &source
);
235 log_error_errno(r
, "Failed to create source for fd:%d (%s): %m",
241 r
= sd_event_add_io(s
->events
, &source
->event
,
242 fd
, EPOLLIN
|EPOLLRDHUP
|EPOLLPRI
,
243 dispatch_raw_source_event
, source
);
245 /* Add additional source for buffer processing. It will be
247 r
= sd_event_add_defer(s
->events
, &source
->buffer_event
,
248 dispatch_raw_source_until_block
, source
);
250 sd_event_source_set_enabled(source
->buffer_event
, SD_EVENT_OFF
);
251 } else if (r
== -EPERM
) {
252 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd
, name
);
253 r
= sd_event_add_defer(s
->events
, &source
->event
,
254 dispatch_blocking_source_event
, source
);
256 sd_event_source_set_enabled(source
->event
, SD_EVENT_ON
);
259 log_error_errno(r
, "Failed to register event source for fd:%d: %m",
264 r
= sd_event_source_set_description(source
->event
, name
);
266 log_error_errno(r
, "Failed to set source name for fd:%d: %m", fd
);
270 return 1; /* work to do */
273 remove_source(s
, fd
);
277 int journal_remote_add_raw_socket(RemoteServer
*s
, int fd
) {
279 _cleanup_close_
int fd_
= fd
;
280 char name
[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
284 r
= sd_event_add_io(s
->events
, &s
->listen_event
,
286 dispatch_raw_connection_event
, s
);
290 xsprintf(name
, "raw-socket-%d", fd
);
292 r
= sd_event_source_set_description(s
->listen_event
, name
);
301 /**********************************************************************
302 **********************************************************************
303 **********************************************************************/
305 int journal_remote_server_init(
308 JournalWriteSplitMode split_mode
,
316 assert(journal_remote_server_global
== NULL
);
317 journal_remote_server_global
= s
;
319 s
->split_mode
= split_mode
;
320 s
->compress
= compress
;
325 else if (split_mode
== JOURNAL_WRITE_SPLIT_NONE
)
326 s
->output
= REMOTE_JOURNAL_PATH
"/remote.journal";
327 else if (split_mode
== JOURNAL_WRITE_SPLIT_HOST
)
328 s
->output
= REMOTE_JOURNAL_PATH
;
330 assert_not_reached("bad split mode");
332 r
= sd_event_default(&s
->events
);
334 return log_error_errno(r
, "Failed to allocate event loop: %m");
336 r
= init_writer_hashmap(s
);
344 static void MHDDaemonWrapper_free(MHDDaemonWrapper
*d
) {
345 MHD_stop_daemon(d
->daemon
);
346 sd_event_source_unref(d
->io_event
);
347 sd_event_source_unref(d
->timer_event
);
352 RemoteServer
* journal_remote_server_destroy(RemoteServer
*s
) {
356 hashmap_free_with_destructor(s
->daemons
, MHDDaemonWrapper_free
);
359 assert(s
->sources_size
== 0 || s
->sources
);
360 for (i
= 0; i
< s
->sources_size
; i
++)
364 writer_unref(s
->_single_writer
);
365 hashmap_free(s
->writers
);
367 sd_event_source_unref(s
->sigterm_event
);
368 sd_event_source_unref(s
->sigint_event
);
369 sd_event_source_unref(s
->listen_event
);
370 sd_event_unref(s
->events
);
372 if (s
== journal_remote_server_global
)
373 journal_remote_server_global
= NULL
;
375 /* fds that we're listening on remain open... */
379 /**********************************************************************
380 **********************************************************************
381 **********************************************************************/
383 int journal_remote_handle_raw_source(
384 sd_event_source
*event
,
389 RemoteSource
*source
;
392 /* Returns 1 if there might be more data pending,
393 * 0 if data is currently exhausted, negative on error.
396 assert(fd
>= 0 && fd
< (ssize_t
) s
->sources_size
);
397 source
= s
->sources
[fd
];
398 assert(source
->importer
.fd
== fd
);
400 r
= process_source(source
, s
->compress
, s
->seal
);
401 if (journal_importer_eof(&source
->importer
)) {
404 log_debug("EOF reached with source %s (fd=%d)",
405 source
->importer
.name
, source
->importer
.fd
);
407 remaining
= journal_importer_bytes_remaining(&source
->importer
);
409 log_notice("Premature EOF. %zu bytes lost.", remaining
);
410 remove_source(s
, source
->importer
.fd
);
411 log_debug("%zu active sources remaining", s
->active
);
413 } else if (r
== -E2BIG
) {
414 log_notice_errno(E2BIG
, "Entry too big, skipped");
416 } else if (r
== -EAGAIN
) {
419 log_debug_errno(r
, "Closing connection: %m");
420 remove_source(s
, fd
);
426 static int dispatch_raw_source_until_block(sd_event_source
*event
,
428 RemoteSource
*source
= userdata
;
431 /* Make sure event stays around even if source is destroyed */
432 sd_event_source_ref(event
);
434 r
= journal_remote_handle_raw_source(event
, source
->importer
.fd
, EPOLLIN
, journal_remote_server_global
);
436 /* No more data for now */
437 sd_event_source_set_enabled(event
, SD_EVENT_OFF
);
439 sd_event_source_unref(event
);
444 static int dispatch_raw_source_event(sd_event_source
*event
,
448 RemoteSource
*source
= userdata
;
451 assert(source
->event
);
452 assert(source
->buffer_event
);
454 r
= journal_remote_handle_raw_source(event
, fd
, EPOLLIN
, journal_remote_server_global
);
456 /* Might have more data. We need to rerun the handler
457 * until we are sure the buffer is exhausted. */
458 sd_event_source_set_enabled(source
->buffer_event
, SD_EVENT_ON
);
463 static int dispatch_blocking_source_event(sd_event_source
*event
,
465 RemoteSource
*source
= userdata
;
467 return journal_remote_handle_raw_source(event
, source
->importer
.fd
, EPOLLIN
, journal_remote_server_global
);
470 static int accept_connection(const char* type
, int fd
,
471 SocketAddress
*addr
, char **hostname
) {
474 log_debug("Accepting new %s connection on fd:%d", type
, fd
);
475 fd2
= accept4(fd
, &addr
->sockaddr
.sa
, &addr
->size
, SOCK_NONBLOCK
|SOCK_CLOEXEC
);
477 return log_error_errno(errno
, "accept() on fd:%d failed: %m", fd
);
479 switch(socket_address_family(addr
)) {
482 _cleanup_free_
char *a
= NULL
;
485 r
= socket_address_print(addr
, &a
);
487 log_error_errno(r
, "socket_address_print(): %m");
492 r
= socknameinfo_pretty(&addr
->sockaddr
, addr
->size
, &b
);
494 log_error_errno(r
, "Resolving hostname failed: %m");
499 log_debug("Accepted %s %s connection from %s",
501 socket_address_family(addr
) == AF_INET
? "IP" : "IPv6",
509 log_error("Rejected %s connection with unsupported family %d",
510 type
, socket_address_family(addr
));
517 static int dispatch_raw_connection_event(sd_event_source
*event
,
521 RemoteServer
*s
= userdata
;
523 SocketAddress addr
= {
524 .size
= sizeof(union sockaddr_union
),
527 char *hostname
= NULL
;
529 fd2
= accept_connection("raw", fd
, &addr
, &hostname
);
533 return journal_remote_add_source(s
, fd2
, hostname
, true);