]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
journald: rework the Synchronize() varlink logic
authorLennart Poettering <lennart@poettering.net>
Tue, 25 Mar 2025 20:46:18 +0000 (16:46 -0400)
committerLennart Poettering <lennart@poettering.net>
Tue, 13 May 2025 13:39:57 +0000 (15:39 +0200)
Previously, if the Synchronize() varlink call is issued we'd wait for
journald to become idle before returning success. That is problematic
however: on a busy system journald might never become idle. Hence, let's
beef up the logic to ensure that we do not wait longer than necessary:
i.e. we make sure we process any data enqueued before the sync request
was submitted, but not more.

Implementing this isn't trivial unfortunately. To deal with this
reasonably, we need to determine somehow for incoming log messages
whether they are from before or after the point in time where the sync
requested was received.

For AF_UNIX/SOCK_DGRAM we can use SO_TIMESTAMP to directly compare
timestamps of incoming messages with the timestamp of the sync request
(unfortunately only CLOCK_REALTIME).

For AF_UNIX/SOCK_STREAM we can call SIOCINQ at the moment we initiate
the sync, and then continue processing incoming traffic, counting down
the bytes until the SIOCINQ returned bytes have been processed. All
further data must have been enqueued later hence.

With those two mechanisms in place we can relatively reliably
synchronize the journal.

This also adds a boolean argument "offline" to the Synchronize() call,
which controls whether to offline the journal after processing the
pending messages. it defaults to true, for compat with the status quo
ante. But for most cases the offlining is probably not necessary, and is
cheaper to do without, hence allow not to do it.

src/journal/journald-kmsg.c
src/journal/journald-server.c
src/journal/journald-server.h
src/journal/journald-stream.c
src/journal/journald-stream.h
src/journal/journald-sync.c [new file with mode: 0644]
src/journal/journald-sync.h [new file with mode: 0644]
src/journal/journald-varlink.c
src/journal/journald-varlink.h
src/journal/meson.build
src/shared/varlink-io.systemd.Journal.c

index 96ee2320de25ab5b65d60b095720654f291cafd2..767e590b260f789a438b9fb59fb71b2c9a3ac296 100644 (file)
@@ -160,7 +160,7 @@ void dev_kmsg_record(Server *s, char *p, size_t l) {
                 *s->kernel_seqnum = serial + 1;
         }
 
-        /* monotonic timestamp */
+        /* CLOCK_BOOTTIME timestamp */
         l -= (e - p) + 1;
         p = e + 1;
         e = memchr(p, ',', l);
@@ -316,6 +316,9 @@ void dev_kmsg_record(Server *s, char *p, size_t l) {
         if (saved_log_max_level != INT_MAX)
                 log_set_max_level(saved_log_max_level);
 
+        s->dev_kmsg_timestamp = usec;
+        sync_req_revalidate_by_timestamp(s);
+
 finish:
         for (j = 0; j < z; j++)
                 free(iovec[j].iov_base);
index 0593b97b1602abf573635397ab88b570ab876254..419dc040cb75be5ee64e5973e93b3ca92710b8c8 100644 (file)
@@ -1596,6 +1596,9 @@ int server_process_datagram(
                         log_ratelimit_warning(JOURNAL_LOG_RATELIMIT,
                                               "Got file descriptors via syslog socket. Ignoring.");
 
+                if (tv)
+                        s->syslog_timestamp = timeval_load(tv);
+
         } else if (fd == s->native_fd) {
                 if (n > 0 && n_fds == 0)
                         server_process_native_message(s, s->buffer, n, ucred, tv, label, label_len);
@@ -1605,6 +1608,9 @@ int server_process_datagram(
                         log_ratelimit_warning(JOURNAL_LOG_RATELIMIT,
                                               "Got too many file descriptors via native socket. Ignoring.");
 
+                if (tv)
+                        s->native_timestamp = timeval_load(tv);
+
         } else {
                 assert(fd == s->audit_fd);
 
@@ -1617,6 +1623,9 @@ int server_process_datagram(
 
         close_many(fds, n_fds);
 
+        if (tv)
+                sync_req_revalidate_by_timestamp(s);
+
         server_refresh_idle_timer(s);
         return 0;
 }
@@ -2753,6 +2762,15 @@ Server* server_free(Server *s) {
 
         mmap_cache_unref(s->mmap);
 
+        SyncReq *req;
+        while ((req = prioq_peek(s->sync_req_realtime_prioq)))
+                sync_req_free(req);
+        prioq_free(s->sync_req_realtime_prioq);
+
+        while ((req = prioq_peek(s->sync_req_boottime_prioq)))
+                sync_req_free(req);
+        prioq_free(s->sync_req_boottime_prioq);
+
         return mfree(s);
 }
 
index e565769779eb0017786599c355f4e51871124af1..fd07ca8d07755d609425b2396bd6f0a48d5e8b0b 100644 (file)
@@ -13,6 +13,7 @@
 #include "journal-file.h"
 #include "journald-context.h"
 #include "journald-stream.h"
+#include "journald-sync.h"
 #include "list.h"
 #include "prioq.h"
 #include "ratelimit.h"
@@ -183,6 +184,17 @@ typedef struct Server {
         ClientContext *pid1_context; /* the context of PID 1 */
 
         sd_varlink_server *varlink_server;
+
+        /* timestamp of most recently processed log messages from each source (CLOCK_REALTIME for the first
+         * two, CLOCK_BOOTTIME for the other) */
+        usec_t native_timestamp, syslog_timestamp, dev_kmsg_timestamp;
+
+        /* Pending synchronization requests, ordered by their timestamp */
+        Prioq *sync_req_realtime_prioq;
+        Prioq *sync_req_boottime_prioq;
+
+        /* Pending synchronization requests with non-zero rqlen counter */
+        LIST_HEAD(SyncReq, sync_req_pending_rqlen);
 } Server;
 
 #define SERVER_MACHINE_ID(s) ((s)->machine_id_field + STRLEN("_MACHINE_ID="))
index 494f906ade761d37c3e1e214650b46a28f70d6fb..4afcc7be03e1cee684edaa35a85a2c35f4883554 100644 (file)
  * let's enforce a line length matching the maximum unit name length (255) */
 #define STDOUT_STREAM_SETUP_PROTOCOL_LINE_MAX (UNIT_NAME_MAX-1U)
 
-typedef enum StdoutStreamState {
-        STDOUT_STREAM_IDENTIFIER,
-        STDOUT_STREAM_UNIT_ID,
-        STDOUT_STREAM_PRIORITY,
-        STDOUT_STREAM_LEVEL_PREFIX,
-        STDOUT_STREAM_FORWARD_TO_SYSLOG,
-        STDOUT_STREAM_FORWARD_TO_KMSG,
-        STDOUT_STREAM_FORWARD_TO_CONSOLE,
-        STDOUT_STREAM_RUNNING,
-} StdoutStreamState;
-
 /* The different types of log record terminators: a real \n was read, a NUL character was read, the maximum line length
  * was reached, or the end of the stream was reached */
 
@@ -73,44 +62,13 @@ typedef enum LineBreak {
         _LINE_BREAK_INVALID = -EINVAL,
 } LineBreak;
 
-struct StdoutStream {
-        Server *server;
-        StdoutStreamState state;
-
-        int fd;
-
-        struct ucred ucred;
-        char *label;
-        char *identifier;
-        char *unit_id;
-        int priority;
-        bool level_prefix:1;
-        bool forward_to_syslog:1;
-        bool forward_to_kmsg:1;
-        bool forward_to_console:1;
-
-        bool fdstore:1;
-        bool in_notify_queue:1;
-
-        char *buffer;
-        size_t length;
-
-        sd_event_source *event_source;
-
-        char *state_file;
-
-        ClientContext *context;
-
-        LIST_FIELDS(StdoutStream, stdout_stream);
-        LIST_FIELDS(StdoutStream, stdout_stream_notify_queue);
-
-        char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX];
-};
-
 StdoutStream* stdout_stream_free(StdoutStream *s) {
         if (!s)
                 return NULL;
 
+        while (s->stream_sync_reqs)
+                stream_sync_req_free(s->stream_sync_reqs);
+
         if (s->server) {
                 if (s->context)
                         client_context_release(s->server, s->context);
@@ -121,8 +79,6 @@ StdoutStream* stdout_stream_free(StdoutStream *s) {
 
                 if (s->in_notify_queue)
                         LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
-
-                (void) server_start_or_stop_idle_timer(s->server); /* Maybe we are idle now? */
         }
 
         sd_event_source_disable_unref(s->event_source);
@@ -145,7 +101,16 @@ void stdout_stream_terminate(StdoutStream *s) {
         if (s->state_file)
                 (void) unlink(s->state_file);
 
-        stdout_stream_free(s);
+        StreamSyncReq *ssr;
+        while ((ssr = s->stream_sync_reqs)) {
+                SyncReq *req = ssr->req;
+                stream_sync_req_free(TAKE_PTR(ssr));
+                sync_req_revalidate(TAKE_PTR(req));
+        }
+
+        Server *server = s->server;
+        stdout_stream_free(TAKE_PTR(s));
+        (void) server_start_or_stop_idle_timer(server); /* Maybe we are idle now? */
 }
 
 static int stdout_stream_save(StdoutStream *s) {
@@ -647,6 +612,10 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
         s->length = l - consumed;
         memmove(s->buffer, p + consumed, s->length);
 
+        LIST_FOREACH(by_stdout_stream, ssr, s->stream_sync_reqs)
+                /* NB: this might invalidate the stdout stream! */
+                stream_sync_req_advance_revalidate(ssr, consumed);
+
         return 1;
 
 terminate:
@@ -747,14 +716,23 @@ static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revent
                 fd = safe_close(fd);
 
                 server_driver_message(s, u.pid, LOG_MESSAGE("Too many stdout streams, refusing connection."));
+
+                server_notify_stream(s, /* stream= */ NULL);
                 return 0;
         }
 
-        r = stdout_stream_install(s, fd, NULL);
-        if (r < 0)
+        StdoutStream *stream;
+        r = stdout_stream_install(s, fd, &stream);
+        if (r < 0) {
+                server_notify_stream(s, /* stream= */ NULL);
                 return r;
+        }
 
         TAKE_FD(fd);
+
+        /* Tell the synchronization logic that we dropped one item from the incoming connection queue */
+        server_notify_stream(s, stream);
+
         return 0;
 }
 
index 4bb4cbbe2fd1fd26844ccc16ba26ae04a0e3e2ca..ba15b97a372976c747627335057fa07245e665ca 100644 (file)
@@ -1,10 +1,59 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 #pragma once
 
-#include "fdset.h"
-
 typedef struct Server Server;
 typedef struct StdoutStream StdoutStream;
+typedef struct StreamSyncReq StreamSyncReq;
+
+#include "fdset.h"
+#include "list.h"
+
+typedef enum StdoutStreamState {
+        STDOUT_STREAM_IDENTIFIER,
+        STDOUT_STREAM_UNIT_ID,
+        STDOUT_STREAM_PRIORITY,
+        STDOUT_STREAM_LEVEL_PREFIX,
+        STDOUT_STREAM_FORWARD_TO_SYSLOG,
+        STDOUT_STREAM_FORWARD_TO_KMSG,
+        STDOUT_STREAM_FORWARD_TO_CONSOLE,
+        STDOUT_STREAM_RUNNING,
+} StdoutStreamState;
+
+struct StdoutStream {
+        Server *server;
+        StdoutStreamState state;
+
+        int fd;
+
+        struct ucred ucred;
+        char *label;
+        char *identifier;
+        char *unit_id;
+        int priority;
+        bool level_prefix:1;
+        bool forward_to_syslog:1;
+        bool forward_to_kmsg:1;
+        bool forward_to_console:1;
+
+        bool fdstore:1;
+        bool in_notify_queue:1;
+
+        char *buffer;
+        size_t length;
+
+        sd_event_source *event_source;
+
+        char *state_file;
+
+        ClientContext *context;
+
+        LIST_FIELDS(StdoutStream, stdout_stream);
+        LIST_FIELDS(StdoutStream, stdout_stream_notify_queue);
+
+        char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX];
+
+        LIST_HEAD(StreamSyncReq, stream_sync_reqs);
+};
 
 int server_open_stdout_socket(Server *s, const char *stdout_socket);
 int server_restore_streams(Server *s, FDSet *fds);
diff --git a/src/journal/journald-sync.c b/src/journal/journald-sync.c
new file mode 100644 (file)
index 0000000..c3b8de7
--- /dev/null
@@ -0,0 +1,372 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <linux/sockios.h>
+#include <sys/ioctl.h>
+
+#include "sd-varlink.h"
+
+#include "io-util.h"
+#include "journald-server.h"
+#include "journald-stream.h"
+#include "journald-sync.h"
+#include "journald-varlink.h"
+#include "socket-netlink.h"
+#include "time-util.h"
+
+StreamSyncReq *stream_sync_req_free(StreamSyncReq *ssr) {
+        if (!ssr)
+                return NULL;
+
+        if (ssr->req)
+                LIST_REMOVE(by_sync_req, ssr->req->stream_sync_reqs, ssr);
+        if (ssr->stream)
+                LIST_REMOVE(by_stdout_stream, ssr->stream->stream_sync_reqs, ssr);
+
+        return mfree(ssr);
+}
+
+void stream_sync_req_advance_revalidate(StreamSyncReq *ssr, size_t p) {
+        assert(ssr);
+
+        /* Subtract the specified number of bytes from the byte counter. And when we hit zero we consider
+         * this stream processed for the synchronization request */
+
+        /* NB: This might invalidate the 'ssr' object! */
+
+        if (p < ssr->pending_siocinq) {
+                ssr->pending_siocinq -= p;
+                return;
+        }
+
+        SyncReq *req = ASSERT_PTR(ssr->req);
+        stream_sync_req_free(TAKE_PTR(ssr));
+
+        /* Maybe we are done now? */
+        sync_req_revalidate(TAKE_PTR(req));
+}
+
+static bool sync_req_is_complete(SyncReq *req) {
+        int r;
+
+        assert(req);
+        assert(req->server);
+
+        /* In case the clock jumped backwards, let's adjust the timestamp, to guarantee reasonably quick
+         * termination */
+        usec_t n = now(CLOCK_REALTIME);
+        if (n < req->realtime)
+                req->realtime = n;
+
+        if (req->realtime_prioq_idx != PRIOQ_IDX_NULL) {
+                /* If this sync request is still in the priority queue it means we still need to check if
+                 * incoming message timestamps are now newer than then sync request timestamp. */
+
+                if (req->server->native_fd >= 0 &&
+                    req->server->native_timestamp < req->realtime) {
+                        r = fd_wait_for_event(req->server->native_fd, POLLIN, /* timeout= */ 0);
+                        if (r < 0)
+                                log_debug_errno(r, "Failed to determine pending IO events of native socket, ignoring: %m");
+                        else if (r != 0) /* if there's more queued we need to wait for the timestamp to pass. If it's idle though we are done here. */
+                                return false;
+                }
+
+                if (req->server->syslog_fd >= 0&&
+                    req->server->syslog_timestamp < req->realtime) {
+                        r = fd_wait_for_event(req->server->syslog_fd, POLLIN, /* timeout= */ 0);
+                        if (r < 0)
+                                log_debug_errno(r, "Failed to determine pending IO events of syslog socket, ignoring: %m");
+                        else if (r != 0)
+                                return false;
+                }
+
+                /* This sync request is fulfilled for the native + syslog datagram streams? Then, let's
+                 * remove this sync request from the priority queue, so that we dont need to consider it
+                 * anymore. */
+                assert(prioq_remove(req->server->sync_req_realtime_prioq, req, &req->realtime_prioq_idx) > 0);
+        }
+
+        if (req->boottime_prioq_idx != PRIOQ_IDX_NULL) {
+                /* Very similar to the above, but for /dev/kmsg we operate on the CLOCK_BOOTTIME clock */
+
+                if (req->server->dev_kmsg_fd >= 0 &&
+                    req->server->dev_kmsg_timestamp < req->boottime) {
+                        r = fd_wait_for_event(req->server->dev_kmsg_fd, POLLIN, /* timeout= */ 0);
+                        if (r < 0)
+                                log_debug_errno(r, "Failed to determine pending IO events of /dev/kmsg file descriptor, ignoring: %m");
+                        else if (r != 0)
+                                return false;
+                }
+
+                assert(prioq_remove(req->server->sync_req_boottime_prioq, req, &req->boottime_prioq_idx) > 0);
+        }
+
+        /* If there are still streams with pending counters, we still need to look into things */
+        if (req->stream_sync_reqs)
+                return false;
+
+        /* If there are still pending connections from before the sync started, we still need to look into things */
+        if (req->pending_rqlen > 0)
+                return false;
+
+        return true;
+}
+
+static int on_idle(sd_event_source *s, void *userdata) {
+        SyncReq *req = ASSERT_PTR(userdata);
+
+        req->idle_event_source = sd_event_source_disable_unref(req->idle_event_source);
+
+        /* When this idle event triggers, then we definitely are done with the synchronization request. This
+         * is a safety net of a kind, to ensure we'll definitely put an end to any synchronization request,
+         * even if we are confused by CLOCK_REALTIME jumps or similar. */
+        sync_req_varlink_reply(TAKE_PTR(req));
+        return 0;
+}
+
+SyncReq* sync_req_free(SyncReq *req) {
+        if (!req)
+                return NULL;
+
+        if (req->server) {
+                if (req->realtime_prioq_idx != PRIOQ_IDX_NULL)
+                        assert_se(prioq_remove(req->server->sync_req_realtime_prioq, req, &req->realtime_prioq_idx) > 0);
+
+                if (req->boottime_prioq_idx != PRIOQ_IDX_NULL)
+                        assert_se(prioq_remove(req->server->sync_req_boottime_prioq, req, &req->boottime_prioq_idx) > 0);
+
+                if (req->pending_rqlen > 0)
+                        LIST_REMOVE(pending_rqlen, req->server->sync_req_pending_rqlen, req);
+        }
+
+        req->idle_event_source = sd_event_source_disable_unref(req->idle_event_source);
+
+        sd_varlink_unref(req->link);
+
+        while (req->stream_sync_reqs)
+                stream_sync_req_free(req->stream_sync_reqs);
+
+        return mfree(req);
+}
+
+static int sync_req_realtime_compare(const SyncReq *x, const SyncReq *y) {
+        return CMP(ASSERT_PTR(x)->realtime, ASSERT_PTR(y)->realtime);
+}
+
+static int sync_req_boottime_compare(const SyncReq *x, const SyncReq *y) {
+        return CMP(ASSERT_PTR(x)->boottime, ASSERT_PTR(y)->boottime);
+}
+
+static int sync_req_add_stream(SyncReq *req, StdoutStream *ss) {
+        assert(req);
+        assert(ss);
+
+        int v = 0;
+        if (ioctl(ss->fd, SIOCINQ, &v) < 0)
+                log_debug_errno(errno, "Failed to issue SIOCINQ on stream socket, ignoring: %m");
+        if (v <= 0)
+                return 0; /* Pending messages are zero anyway? then there's nothing to track */
+
+        _cleanup_(stream_sync_req_freep) StreamSyncReq *ssr = new(StreamSyncReq, 1);
+        if (!ssr)
+                return -ENOMEM;
+
+        *ssr = (StreamSyncReq) {
+                .stream = ss,
+                .pending_siocinq = v,
+                .req = req,
+        };
+
+        LIST_PREPEND(by_sync_req, req->stream_sync_reqs, ssr);
+        LIST_PREPEND(by_stdout_stream, ss->stream_sync_reqs, ssr);
+
+        TAKE_PTR(ssr);
+        return 1;
+}
+
+int sync_req_new(Server *s, sd_varlink *link, SyncReq **ret) {
+        int r;
+
+        assert(s);
+        assert(link);
+        assert(ret);
+
+        _cleanup_(sync_req_freep) SyncReq *req = new(SyncReq, 1);
+        if (!req)
+                return -ENOMEM;
+
+        *req = (SyncReq) {
+                .server = s,
+                .link = sd_varlink_ref(link),
+                .realtime_prioq_idx = PRIOQ_IDX_NULL,
+                .boottime_prioq_idx = PRIOQ_IDX_NULL,
+        };
+
+        /* We use five distinct mechanisms to determine when the synchronization request is complete:
+         *
+         * 1. For the syslog/native AF_UNIX/SOCK_DGRAM sockets we look at the datagram timestamps: once the
+         *    most recently seen datagram on the socket is newer than the timestamp when we initiated the
+         *    sync request we know that all previously enqueued messages have been processed by us.
+         *
+         * 2. For established stream AF_UNIX/SOCK_STREAM sockets we have no timestamps. For them we take the
+         *    SIOCINQ counter at the moment the synchronization request was enqueued. And once we processed
+         *    the indicated number of input bytes we know that anything further was enqueued later than the
+         *    original synchronization request we started from.
+         *
+         * 3. For pending new, un-accept()ed stream AF_UNIX/SOCK_STREAM sockets we have no timestamps either,
+         *    but we can query the number of pending connections via the sockdiag netlink protocol (I so wish
+         *    there was an easier, quicker way!). Once we accept()ed that many connections we know all
+         *    further connections are definitely more recent than the sync request.
+         *
+         * 4. For /dev/kmsg we look at the log message timestamps, similar to the AF_UNIX/SOCK_DGRAM case,
+         *    and they are in CLOCK_BOOTTIME clock.
+         *
+         * 5. Finally, as safety net we install an idle handler with a very low priority (lower than the
+         *    syslog/native/stream IO handlers). If this handler is called we know that there's no pending
+         *    IO, hence everything so far queued is definitely processed.
+         *
+         * Note the asymmetry: for AF_UNIX/SOCK_DGRAM + /dev/kmsg we go by timestamp, for established
+         * AF_UNIX/SOCK_STREAM we count bytes. That's because for SOCK_STREAM we have no timestamps, and for
+         * SOCK_DGRAM we have no API to query all pending bytes (as SIOCINQ on SOCK_DGRAM reports size of
+         * next datagram, not size of all pending datagrams). Ideally, we'd actually use neither of this, and
+         * the kernel would provide us CLOCK_MONOTONIC timestamps...
+         *
+         * Note that CLOCK_REALTIME is not necessarily monotonic (that's the whole point of it after all). If
+         * the clock jumps then we know the algorithm will eventually terminate, because of the idle handler
+         * that is our safety net. (Also, whenever we see poll() return an empty revents for some source we
+         * know everything is processed by now regardless of any timestamps or pending byte or connection
+         * counts.) */
+
+        req->realtime = now(CLOCK_REALTIME);
+        req->boottime = now(CLOCK_BOOTTIME);
+
+        if (s->native_event_source || s->syslog_event_source) {
+                r = prioq_ensure_put(&s->sync_req_realtime_prioq, sync_req_realtime_compare, req, &req->realtime_prioq_idx);
+                if (r < 0)
+                        return r;
+        }
+
+        if (s->dev_kmsg_event_source) {
+                r = prioq_ensure_put(&s->sync_req_boottime_prioq, sync_req_boottime_compare, req, &req->boottime_prioq_idx);
+                if (r < 0)
+                        return r;
+        }
+
+        r = sd_event_add_defer(s->event, &req->idle_event_source, on_idle, req);
+        if (r < 0)
+                return r;
+
+        r = sd_event_source_set_priority(req->idle_event_source, SD_EVENT_PRIORITY_NORMAL+15);
+        if (r < 0)
+                return r;
+
+        (void) sd_event_source_set_description(req->idle_event_source, "deferred-sync");
+
+        /* Now determine the pending byte counter for each stdout stream. If non-zero allocate a
+         * StreamSyncReq for the stream to keep track of it */
+        LIST_FOREACH(stdout_stream, ss, s->stdout_streams) {
+                r = sync_req_add_stream(req, ss);
+                if (r < 0)
+                        return r;
+        }
+
+        /* Also track how many pending, incoming stream sockets there are currently, so that we process them
+         * too */
+        r = af_unix_get_qlen(s->stdout_fd, &req->pending_rqlen);
+        if (r < 0)
+                log_warning_errno(r, "Failed to determine current incoming queue length, ignoring: %m");
+        if (req->pending_rqlen > 0)
+                LIST_PREPEND(pending_rqlen, s->sync_req_pending_rqlen, req);
+
+        *ret = TAKE_PTR(req);
+        return 0;
+}
+
+static void sync_req_advance_rqlen_revalidate(SyncReq *req, uint32_t current_rqlen, StdoutStream *ss) {
+        int r;
+
+        assert(req);
+
+        /* Invoked whenever a new connection was accept()ed, i.e. dropped off the queue of pending incoming
+         * connections. We decrease the qlen counter by one here, except if the new overall counter is
+         * already below our target. */
+
+        uint32_t n;
+        if (req->pending_rqlen <= 0)
+                n = 0;
+        else if (req->pending_rqlen > current_rqlen)
+                n = current_rqlen;
+        else
+                n = req->pending_rqlen - 1;
+
+        if (req->pending_rqlen > 0) {
+                /* if this synchronization request is supposed to process a non-zero number of connections we
+                 * need to also track what's inside those stream connections */
+                if (ss) {
+                        r = sync_req_add_stream(req, ss);
+                        if (r < 0)
+                                log_warning_errno(r, "Failed to track stream queue size, ignoring: %m");
+                }
+
+                /* If there are no more connections to wait for, remove us from the list of synchronization
+                 * requests with non-zero pending connection counters */
+                if (n == 0)
+                        LIST_REMOVE(pending_rqlen, req->server->sync_req_pending_rqlen, req);
+        }
+
+        req->pending_rqlen = n;
+
+        sync_req_revalidate(req);
+}
+
+void server_notify_stream(Server *s, StdoutStream *ss) {
+        int r;
+
+        assert(s);
+
+        /* Invoked whenever a new connection was accept()ed, i.e. dropped off the queue of pending incoming
+         * connections. */
+
+        if (!s->sync_req_pending_rqlen)
+                return;
+
+        uint32_t current_qlen;
+
+        r = af_unix_get_qlen(s->stdout_fd, &current_qlen);
+        if (r < 0) {
+                log_warning_errno(r, "Failed to determine current AF_UNIX stream socket pending connections, ignoring: %m");
+                current_qlen = UINT32_MAX;
+        }
+
+        LIST_FOREACH(pending_rqlen, sr, s->sync_req_pending_rqlen)
+                /* NB: this might invalidate the SyncReq object! */
+                sync_req_advance_rqlen_revalidate(sr, current_qlen, ss);
+}
+
+bool sync_req_revalidate(SyncReq *req) {
+        assert(req);
+
+        /* Check if the synchronization request is complete now. If so, answer the Varlink client. NB: this
+         * might invalidate the SyncReq object */
+
+        if (!sync_req_is_complete(req))
+                return false;
+
+        sync_req_varlink_reply(TAKE_PTR(req));
+        return true;
+}
+
+void sync_req_revalidate_by_timestamp(Server *s) {
+        assert(s);
+
+        /* Go through the pending sync requests by timestamp, and complete those for which a sync is now
+         * complete. */
+
+        SyncReq *req;
+        while ((req = prioq_peek(s->sync_req_realtime_prioq)))
+                if (!sync_req_revalidate(req))
+                        break;
+
+        while ((req = prioq_peek(s->sync_req_boottime_prioq)))
+                if (!sync_req_revalidate(req))
+                        break;
+}
diff --git a/src/journal/journald-sync.h b/src/journal/journald-sync.h
new file mode 100644 (file)
index 0000000..3f6d376
--- /dev/null
@@ -0,0 +1,56 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+typedef struct Server Server;
+typedef struct StreamSyncReq StreamSyncReq;
+typedef struct SyncReq SyncReq;
+
+#include "journald-stream.h"
+#include "list.h"
+#include "macro.h"
+
+/* Encapsulates the synchronization request data we need to keep per STDOUT stream. Primarily a byte counter
+ * to count down. */
+struct StreamSyncReq {
+        SyncReq *req;
+        StdoutStream *stream;
+
+        uint64_t pending_siocinq; /* The SIOCINQ counter when the sync was initiated */
+
+        LIST_FIELDS(StreamSyncReq, by_sync_req);
+        LIST_FIELDS(StreamSyncReq, by_stdout_stream);
+};
+
+/* Encapsulates a synchronization request */
+struct SyncReq {
+        Server *server;
+        sd_varlink *link;
+
+        bool offline; /* if true, we'll offline the journal files after sync is complete */
+
+        usec_t realtime; /* CLOCK_REALTIME timestamp when synchronization request was initiated (for syncing on AF_UNIX/SOCK_DGRAM) */
+        usec_t boottime; /* CLOCK_BOOTTIME timestamp when synchronization request was initiated (for syncing on /dev/kmsg) */
+
+        sd_event_source *idle_event_source;
+
+        uint32_t pending_rqlen;   /* The rqlen counter on the stream AF_UNIX socket when the sync was initiated */
+        LIST_FIELDS(SyncReq, pending_rqlen);
+
+        LIST_HEAD(StreamSyncReq, stream_sync_reqs);
+
+        unsigned realtime_prioq_idx;
+        unsigned boottime_prioq_idx;
+};
+
+StreamSyncReq *stream_sync_req_free(StreamSyncReq *ssr);
+DEFINE_TRIVIAL_CLEANUP_FUNC(StreamSyncReq*, stream_sync_req_free);
+void stream_sync_req_advance_revalidate(StreamSyncReq *ssr, size_t p);
+
+int sync_req_new(Server *s, sd_varlink *link, SyncReq **ret);
+SyncReq* sync_req_free(SyncReq *req);
+DEFINE_TRIVIAL_CLEANUP_FUNC(SyncReq*, sync_req_free);
+
+bool sync_req_revalidate(SyncReq *req);
+void sync_req_revalidate_by_timestamp(Server *s);
+
+void server_notify_stream(Server *s, StdoutStream *ss);
index d2ba34c74792325182be08acd0250477bdb0b363..535b055f1668fc8e5b0188ab6e9b8f93312d20d1 100644 (file)
@@ -1,76 +1,64 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
 #include "journald-server.h"
+#include "journald-sync.h"
 #include "journald-varlink.h"
 #include "varlink-io.systemd.Journal.h"
 #include "varlink-io.systemd.service.h"
 #include "varlink-util.h"
 
-static int synchronize_second_half(sd_event_source *event_source, void *userdata) {
-        sd_varlink *link = ASSERT_PTR(userdata);
-        Server *s;
+void sync_req_varlink_reply(SyncReq *req) {
         int r;
 
-        assert_se(s = sd_varlink_get_userdata(link));
+        assert(req);
 
-        /* This is the "second half" of the Synchronize() varlink method. This function is called as deferred
-         * event source at a low priority to ensure the synchronization completes after all queued log
-         * messages are processed. */
-        server_full_sync(s, /* wait = */ true);
+        /* This is the "second half" of the Synchronize() varlink method. This function is called when we
+         * determine that no messages that were enqueued to us when the request was initiated is pending
+         * anymore. */
 
-        /* Let's get rid of the event source now, by marking it as non-floating again. It then has no ref
-         * anymore and is immediately destroyed after we return from this function, i.e. from this event
-         * source handler at the end. */
-        r = sd_event_source_set_floating(event_source, false);
-        if (r < 0)
-                return log_error_errno(r, "Failed to mark event source as non-floating: %m");
+        if (req->offline)
+                server_full_sync(req->server, /* wait = */ true);
 
-        return sd_varlink_reply(link, NULL);
-}
+        /* Disconnect the SyncReq from the Varlink connection object, and free it */
+        _cleanup_(sd_varlink_unrefp) sd_varlink *vl = TAKE_PTR(req->link);
+        sd_varlink_set_userdata(vl, req->server); /* reinstall server object */
+        req = sync_req_free(req);
 
-static void synchronize_destroy(void *userdata) {
-        sd_varlink_unref(userdata);
+        r = sd_varlink_reply(vl, NULL);
+        if (r < 0)
+                log_debug_errno(r, "Failed to reply to Synchronize() client, ignoring: %m");
 }
 
 static int vl_method_synchronize(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
-        _cleanup_(sd_event_source_unrefp) sd_event_source *event_source = NULL;
+        int offline = -1;
+
+        static const sd_json_dispatch_field dispatch_table[] = {
+                { "offline", SD_JSON_VARIANT_BOOLEAN, sd_json_dispatch_tristate, 0, 0},
+                {}
+        };
+
         Server *s = ASSERT_PTR(userdata);
         int r;
 
         assert(link);
 
-        r = sd_varlink_dispatch(link, parameters, /* dispatch_table = */ NULL, /* userdata = */ NULL);
+        r = sd_varlink_dispatch(link, parameters, dispatch_table, &offline);
         if (r != 0)
                 return r;
 
-        log_info("Received client request to sync journal.");
-
-        /* We don't do the main work now, but instead enqueue a deferred event loop job which will do
-         * it. That job is scheduled at low priority, so that we return from this method call only after all
-         * queued but not processed log messages are written to disk, so that this method call returning can
-         * be used as nice synchronization point. */
-        r = sd_event_add_defer(s->event, &event_source, synchronize_second_half, link);
-        if (r < 0)
-                return log_error_errno(r, "Failed to allocate defer event source: %m");
-
-        r = sd_event_source_set_destroy_callback(event_source, synchronize_destroy);
-        if (r < 0)
-                return log_error_errno(r, "Failed to set event source destroy callback: %m");
-
-        sd_varlink_ref(link); /* The varlink object is now left to the destroy callback to unref */
+        log_full(offline != 0 ? LOG_INFO : LOG_DEBUG,
+                 "Received client request to sync journal (%s offlining).", offline != 0 ? "with" : "without");
 
-        r = sd_event_source_set_priority(event_source, SD_EVENT_PRIORITY_NORMAL+15);
-        if (r < 0)
-                return log_error_errno(r, "Failed to set defer event source priority: %m");
+        _cleanup_(sync_req_freep) SyncReq *sr = NULL;
 
-        /* Give up ownership of this event source. It will now be destroyed along with event loop itself,
-         * unless it destroys itself earlier. */
-        r = sd_event_source_set_floating(event_source, true);
+        r = sync_req_new(s, link, &sr);
         if (r < 0)
-                return log_error_errno(r, "Failed to mark event source as floating: %m");
+                return r;
 
-        (void) sd_event_source_set_description(event_source, "deferred-sync");
+        sr->offline = offline != 0;
+        sd_varlink_set_userdata(link, sr);
 
+        sync_req_revalidate(TAKE_PTR(sr));
         return 0;
 }
 
@@ -145,6 +133,15 @@ static void vl_disconnect(sd_varlink_server *server, sd_varlink *link, void *use
         assert(server);
         assert(link);
 
+        void *u = sd_varlink_get_userdata(link);
+        if (u != s) {
+                /* If this is a Varlink connection that does not have the Server object as userdata, then it has a SyncReq object instead. Let's finish it. */
+
+                SyncReq *req = u;
+                sd_varlink_set_userdata(link, s); /* reinstall server object */
+                sync_req_free(req);
+        }
+
         (void) server_start_or_stop_idle_timer(s); /* maybe we are idle now */
 }
 
index ab34e01117bfb3d33f483395b445baef95dc89fc..e2c3ce40a0d776e683c648ec16d57cb6e558fb73 100644 (file)
@@ -1,6 +1,9 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 #pragma once
 
-typedef struct Server Server;
+#include "journald-server.h"
+#include "journald-sync.h"
 
 int server_open_varlink(Server *s, const char *socket, int fd);
+
+void sync_req_varlink_reply(SyncReq *req);
index a7fcd9a41f08162699e09b9046198f7cfb2de7e2..1b8d9831c1a5dbe7a01f903841f7a84ae7bfcde3 100644 (file)
@@ -14,6 +14,7 @@ systemd_journald_extract_sources = files(
         'journald-server.c',
         'journald-socket.c',
         'journald-stream.c',
+        'journald-sync.c',
         'journald-syslog.c',
         'journald-varlink.c',
         'journald-wall.c',
index ddc36370860ec03c868310715c9977f01287be82..d4fd0916b6353573b3893d3ebaa8a4d3b2a23a8e 100644 (file)
@@ -2,7 +2,11 @@
 
 #include "varlink-io.systemd.Journal.h"
 
-static SD_VARLINK_DEFINE_METHOD(Synchronize);
+static SD_VARLINK_DEFINE_METHOD(
+                Synchronize,
+                SD_VARLINK_FIELD_COMMENT("Controls whether to offline the journal files as part of the synchronization operation."),
+                SD_VARLINK_DEFINE_INPUT(offline, SD_VARLINK_BOOL, 0));
+
 static SD_VARLINK_DEFINE_METHOD(Rotate);
 static SD_VARLINK_DEFINE_METHOD(FlushToVar);
 static SD_VARLINK_DEFINE_METHOD(RelinquishVar);