1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
8 #include "alloc-util.h"
9 #include "errno-util.h"
12 #include "hash-funcs.h"
14 #include "journal-file-util.h"
15 #include "journal-remote.h"
16 #include "journal-remote-write.h"
18 #include "socket-util.h"
19 #include "stdio-util.h"
21 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
23 #define filename_escape(s) xescape((s), "/ ")
25 static int open_output(RemoteServer
*s
, Writer
*w
, const char *host
) {
26 _cleanup_free_
char *_filename
= NULL
;
33 switch (s
->split_mode
) {
34 case JOURNAL_WRITE_SPLIT_NONE
:
38 case JOURNAL_WRITE_SPLIT_HOST
: {
39 _cleanup_free_
char *name
= NULL
;
43 name
= filename_escape(host
);
47 r
= asprintf(&_filename
, "%s/remote-%s.journal", s
->output
, name
);
59 r
= journal_file_open_reliably(
69 return log_error_errno(r
, "Failed to open output journal %s: %m", filename
);
71 log_debug("Opened output file %s", w
->journal
->path
);
75 /**********************************************************************
76 **********************************************************************
77 **********************************************************************/
79 static int init_writer_hashmap(RemoteServer
*s
) {
80 static const struct hash_ops
* const hash_ops
[] = {
81 [JOURNAL_WRITE_SPLIT_NONE
] = NULL
,
82 [JOURNAL_WRITE_SPLIT_HOST
] = &string_hash_ops
,
86 assert(s
->split_mode
>= 0 && s
->split_mode
< (int) ELEMENTSOF(hash_ops
));
88 s
->writers
= hashmap_new(hash_ops
[s
->split_mode
]);
95 int journal_remote_get_writer(RemoteServer
*s
, const char *host
, Writer
**writer
) {
96 _cleanup_(writer_unrefp
) Writer
*w
= NULL
;
103 switch (s
->split_mode
) {
104 case JOURNAL_WRITE_SPLIT_NONE
:
105 key
= "one and only";
108 case JOURNAL_WRITE_SPLIT_HOST
:
114 assert_not_reached();
117 w
= hashmap_get(s
->writers
, key
);
121 r
= writer_new(s
, &w
);
125 if (s
->split_mode
== JOURNAL_WRITE_SPLIT_HOST
) {
126 w
->hashmap_key
= strdup(key
);
131 r
= open_output(s
, w
, host
);
135 r
= hashmap_put(s
->writers
, w
->hashmap_key
?: key
, w
);
140 *writer
= TAKE_PTR(w
);
144 /**********************************************************************
145 **********************************************************************
146 **********************************************************************/
148 /* This should go away as soon as μhttpd allows state to be passed around. */
149 RemoteServer
*journal_remote_server_global
;
151 static int dispatch_raw_source_event(sd_event_source
*event
,
155 static int dispatch_raw_source_until_block(sd_event_source
*event
,
157 static int dispatch_blocking_source_event(sd_event_source
*event
,
159 static int dispatch_raw_connection_event(sd_event_source
*event
,
164 static int get_source_for_fd(RemoteServer
*s
,
165 int fd
, char *name
, RemoteSource
**source
) {
169 /* This takes ownership of name, but only on success. */
175 if (!GREEDY_REALLOC0(s
->sources
, fd
+ 1))
178 r
= journal_remote_get_writer(s
, name
, &writer
);
180 return log_warning_errno(r
, "Failed to get writer for source %s: %m",
183 if (!s
->sources
[fd
]) {
184 s
->sources
[fd
] = source_new(fd
, false, name
, writer
);
185 if (!s
->sources
[fd
]) {
186 writer_unref(writer
);
193 *source
= s
->sources
[fd
];
197 static int remove_source(RemoteServer
*s
, int fd
) {
198 RemoteSource
*source
;
201 assert(fd
>= 0 && fd
< (ssize_t
) MALLOC_ELEMENTSOF(s
->sources
));
203 source
= s
->sources
[fd
];
205 /* this closes fd too */
207 s
->sources
[fd
] = NULL
;
214 int journal_remote_add_source(RemoteServer
*s
, int fd
, char *name
, bool own_name
) {
215 RemoteSource
*source
= NULL
;
218 /* This takes ownership of name, even on failure, if own_name is true. */
230 r
= get_source_for_fd(s
, fd
, name
, &source
);
232 log_error_errno(r
, "Failed to create source for fd:%d (%s): %m",
238 r
= sd_event_add_io(s
->event
, &source
->event
,
239 fd
, EPOLLIN
|EPOLLRDHUP
|EPOLLPRI
,
240 dispatch_raw_source_event
, source
);
242 /* Add additional source for buffer processing. It will be
244 r
= sd_event_add_defer(s
->event
, &source
->buffer_event
,
245 dispatch_raw_source_until_block
, source
);
247 r
= sd_event_source_set_enabled(source
->buffer_event
, SD_EVENT_OFF
);
248 } else if (r
== -EPERM
) {
249 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd
, name
);
250 r
= sd_event_add_defer(s
->event
, &source
->event
,
251 dispatch_blocking_source_event
, source
);
253 r
= sd_event_source_set_enabled(source
->event
, SD_EVENT_ON
);
256 log_error_errno(r
, "Failed to register event source for fd:%d: %m",
261 r
= sd_event_source_set_description(source
->event
, name
);
263 log_error_errno(r
, "Failed to set source name for fd:%d: %m", fd
);
267 return 1; /* work to do */
270 remove_source(s
, fd
);
274 int journal_remote_add_raw_socket(RemoteServer
*s
, int fd
) {
275 _unused_ _cleanup_close_
int fd_
= fd
;
276 char name
[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
282 r
= sd_event_add_io(s
->event
, &s
->listen_event
,
284 dispatch_raw_connection_event
, s
);
288 xsprintf(name
, "raw-socket-%d", fd
);
290 r
= sd_event_source_set_description(s
->listen_event
, name
);
299 /**********************************************************************
300 **********************************************************************
301 **********************************************************************/
303 int journal_remote_server_init(
306 JournalWriteSplitMode split_mode
,
307 JournalFileFlags file_flags
) {
313 assert(journal_remote_server_global
== NULL
);
314 journal_remote_server_global
= s
;
316 s
->split_mode
= split_mode
;
317 s
->file_flags
= file_flags
;
321 else if (split_mode
== JOURNAL_WRITE_SPLIT_NONE
)
322 s
->output
= REMOTE_JOURNAL_PATH
"/remote.journal";
323 else if (split_mode
== JOURNAL_WRITE_SPLIT_HOST
)
324 s
->output
= REMOTE_JOURNAL_PATH
;
326 assert_not_reached();
328 r
= sd_event_default(&s
->event
);
330 return log_error_errno(r
, "Failed to allocate event loop: %m");
332 r
= init_writer_hashmap(s
);
339 void journal_remote_server_destroy(RemoteServer
*s
) {
345 hashmap_free(s
->daemons
);
347 for (i
= 0; i
< MALLOC_ELEMENTSOF(s
->sources
); i
++)
351 writer_unref(s
->_single_writer
);
352 hashmap_free(s
->writers
);
354 sd_event_source_unref(s
->listen_event
);
355 sd_event_unref(s
->event
);
357 if (s
== journal_remote_server_global
)
358 journal_remote_server_global
= NULL
;
360 /* fds that we're listening on remain open... */
363 /**********************************************************************
364 **********************************************************************
365 **********************************************************************/
367 int journal_remote_handle_raw_source(
368 sd_event_source
*event
,
373 RemoteSource
*source
;
376 /* Returns 1 if there might be more data pending,
377 * 0 if data is currently exhausted, negative on error.
381 assert(fd
>= 0 && fd
< (ssize_t
) MALLOC_ELEMENTSOF(s
->sources
));
382 source
= s
->sources
[fd
];
383 assert(source
->importer
.fd
== fd
);
385 r
= process_source(source
, s
->file_flags
);
386 if (journal_importer_eof(&source
->importer
)) {
389 log_debug("EOF reached with source %s (fd=%d)",
390 source
->importer
.name
, source
->importer
.fd
);
392 remaining
= journal_importer_bytes_remaining(&source
->importer
);
394 log_notice("Premature EOF. %zu bytes lost.", remaining
);
395 remove_source(s
, source
->importer
.fd
);
396 log_debug("%zu active sources remaining", s
->active
);
398 } else if (r
== -E2BIG
) {
399 log_notice("Entry with too many fields, skipped");
401 } else if (r
== -ENOBUFS
) {
402 log_notice("Entry too big, skipped");
404 } else if (r
== -EAGAIN
) {
407 log_debug_errno(r
, "Closing connection: %m");
408 remove_source(s
, fd
);
414 static int dispatch_raw_source_until_block(sd_event_source
*event
,
416 RemoteSource
*source
= ASSERT_PTR(userdata
);
421 /* Make sure event stays around even if source is destroyed */
422 sd_event_source_ref(event
);
424 r
= journal_remote_handle_raw_source(event
, source
->importer
.fd
, EPOLLIN
, journal_remote_server_global
);
428 /* No more data for now */
429 k
= sd_event_source_set_enabled(event
, SD_EVENT_OFF
);
434 sd_event_source_unref(event
);
439 static int dispatch_raw_source_event(sd_event_source
*event
,
443 RemoteSource
*source
= ASSERT_PTR(userdata
);
446 assert(source
->event
);
447 assert(source
->buffer_event
);
449 r
= journal_remote_handle_raw_source(event
, fd
, EPOLLIN
, journal_remote_server_global
);
453 /* Might have more data. We need to rerun the handler
454 * until we are sure the buffer is exhausted. */
455 k
= sd_event_source_set_enabled(source
->buffer_event
, SD_EVENT_ON
);
463 static int dispatch_blocking_source_event(sd_event_source
*event
,
465 RemoteSource
*source
= ASSERT_PTR(userdata
);
467 return journal_remote_handle_raw_source(event
, source
->importer
.fd
, EPOLLIN
, journal_remote_server_global
);
470 static int accept_connection(
476 _cleanup_close_
int fd2
= -EBADF
;
482 log_debug("Accepting new %s connection on fd:%d", type
, fd
);
483 fd2
= accept4(fd
, &addr
->sockaddr
.sa
, &addr
->size
, SOCK_NONBLOCK
|SOCK_CLOEXEC
);
485 if (ERRNO_IS_ACCEPT_AGAIN(errno
))
488 return log_error_errno(errno
, "accept() on fd:%d failed: %m", fd
);
491 switch (socket_address_family(addr
)) {
496 _cleanup_free_
char *a
= NULL
;
499 r
= socket_address_print(addr
, &a
);
501 return log_error_errno(r
, "socket_address_print(): %m");
503 r
= socknameinfo_pretty(&addr
->sockaddr
.sa
, addr
->size
, &b
);
505 return log_error_errno(r
, "Resolving hostname failed: %m");
507 log_debug("Accepted %s %s connection from %s",
509 af_to_ipv4_ipv6(socket_address_family(addr
)),
517 return log_error_errno(SYNTHETIC_ERRNO(EINVAL
),
518 "Rejected %s connection with unsupported family %d",
519 type
, socket_address_family(addr
));
523 static int dispatch_raw_connection_event(
524 sd_event_source
*event
,
529 RemoteServer
*s
= ASSERT_PTR(userdata
);
531 SocketAddress addr
= {
532 .size
= sizeof(union sockaddr_union
),
535 char *hostname
= NULL
;
537 fd2
= accept_connection("raw", fd
, &addr
, &hostname
);
543 return journal_remote_add_source(s
, fd2
, hostname
, true);