*s->kernel_seqnum = serial + 1;
}
- /* monotonic timestamp */
+ /* CLOCK_BOOTTIME timestamp */
l -= (e - p) + 1;
p = e + 1;
e = memchr(p, ',', l);
if (saved_log_max_level != INT_MAX)
log_set_max_level(saved_log_max_level);
+ s->dev_kmsg_timestamp = usec;
+ sync_req_revalidate_by_timestamp(s);
+
finish:
for (j = 0; j < z; j++)
free(iovec[j].iov_base);
log_ratelimit_warning(JOURNAL_LOG_RATELIMIT,
"Got file descriptors via syslog socket. Ignoring.");
+ if (tv)
+ s->syslog_timestamp = timeval_load(tv);
+
} else if (fd == s->native_fd) {
if (n > 0 && n_fds == 0)
server_process_native_message(s, s->buffer, n, ucred, tv, label, label_len);
log_ratelimit_warning(JOURNAL_LOG_RATELIMIT,
"Got too many file descriptors via native socket. Ignoring.");
+ if (tv)
+ s->native_timestamp = timeval_load(tv);
+
} else {
assert(fd == s->audit_fd);
close_many(fds, n_fds);
+ if (tv)
+ sync_req_revalidate_by_timestamp(s);
+
server_refresh_idle_timer(s);
return 0;
}
mmap_cache_unref(s->mmap);
+ SyncReq *req;
+ while ((req = prioq_peek(s->sync_req_realtime_prioq)))
+ sync_req_free(req);
+ prioq_free(s->sync_req_realtime_prioq);
+
+ while ((req = prioq_peek(s->sync_req_boottime_prioq)))
+ sync_req_free(req);
+ prioq_free(s->sync_req_boottime_prioq);
+
return mfree(s);
}
#include "journal-file.h"
#include "journald-context.h"
#include "journald-stream.h"
+#include "journald-sync.h"
#include "list.h"
#include "prioq.h"
#include "ratelimit.h"
ClientContext *pid1_context; /* the context of PID 1 */
sd_varlink_server *varlink_server;
+
+ /* timestamp of most recently processed log messages from each source (CLOCK_REALTIME for the first
+ * two, CLOCK_BOOTTIME for the other) */
+ usec_t native_timestamp, syslog_timestamp, dev_kmsg_timestamp;
+
+ /* Pending synchronization requests, ordered by their timestamp */
+ Prioq *sync_req_realtime_prioq;
+ Prioq *sync_req_boottime_prioq;
+
+ /* Pending synchronization requests with non-zero rqlen counter */
+ LIST_HEAD(SyncReq, sync_req_pending_rqlen);
} Server;
#define SERVER_MACHINE_ID(s) ((s)->machine_id_field + STRLEN("_MACHINE_ID="))
* let's enforce a line length matching the maximum unit name length (255) */
#define STDOUT_STREAM_SETUP_PROTOCOL_LINE_MAX (UNIT_NAME_MAX-1U)
-typedef enum StdoutStreamState {
- STDOUT_STREAM_IDENTIFIER,
- STDOUT_STREAM_UNIT_ID,
- STDOUT_STREAM_PRIORITY,
- STDOUT_STREAM_LEVEL_PREFIX,
- STDOUT_STREAM_FORWARD_TO_SYSLOG,
- STDOUT_STREAM_FORWARD_TO_KMSG,
- STDOUT_STREAM_FORWARD_TO_CONSOLE,
- STDOUT_STREAM_RUNNING,
-} StdoutStreamState;
-
/* The different types of log record terminators: a real \n was read, a NUL character was read, the maximum line length
* was reached, or the end of the stream was reached */
_LINE_BREAK_INVALID = -EINVAL,
} LineBreak;
-struct StdoutStream {
- Server *server;
- StdoutStreamState state;
-
- int fd;
-
- struct ucred ucred;
- char *label;
- char *identifier;
- char *unit_id;
- int priority;
- bool level_prefix:1;
- bool forward_to_syslog:1;
- bool forward_to_kmsg:1;
- bool forward_to_console:1;
-
- bool fdstore:1;
- bool in_notify_queue:1;
-
- char *buffer;
- size_t length;
-
- sd_event_source *event_source;
-
- char *state_file;
-
- ClientContext *context;
-
- LIST_FIELDS(StdoutStream, stdout_stream);
- LIST_FIELDS(StdoutStream, stdout_stream_notify_queue);
-
- char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX];
-};
-
StdoutStream* stdout_stream_free(StdoutStream *s) {
if (!s)
return NULL;
+ while (s->stream_sync_reqs)
+ stream_sync_req_free(s->stream_sync_reqs);
+
if (s->server) {
if (s->context)
client_context_release(s->server, s->context);
if (s->in_notify_queue)
LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
-
- (void) server_start_or_stop_idle_timer(s->server); /* Maybe we are idle now? */
}
sd_event_source_disable_unref(s->event_source);
if (s->state_file)
(void) unlink(s->state_file);
- stdout_stream_free(s);
+ StreamSyncReq *ssr;
+ while ((ssr = s->stream_sync_reqs)) {
+ SyncReq *req = ssr->req;
+ stream_sync_req_free(TAKE_PTR(ssr));
+ sync_req_revalidate(TAKE_PTR(req));
+ }
+
+ Server *server = s->server;
+ stdout_stream_free(TAKE_PTR(s));
+ (void) server_start_or_stop_idle_timer(server); /* Maybe we are idle now? */
}
static int stdout_stream_save(StdoutStream *s) {
s->length = l - consumed;
memmove(s->buffer, p + consumed, s->length);
+ LIST_FOREACH(by_stdout_stream, ssr, s->stream_sync_reqs)
+ /* NB: this might invalidate the stdout stream! */
+ stream_sync_req_advance_revalidate(ssr, consumed);
+
return 1;
terminate:
fd = safe_close(fd);
server_driver_message(s, u.pid, LOG_MESSAGE("Too many stdout streams, refusing connection."));
+
+ server_notify_stream(s, /* stream= */ NULL);
return 0;
}
- r = stdout_stream_install(s, fd, NULL);
- if (r < 0)
+ StdoutStream *stream;
+ r = stdout_stream_install(s, fd, &stream);
+ if (r < 0) {
+ server_notify_stream(s, /* stream= */ NULL);
return r;
+ }
TAKE_FD(fd);
+
+ /* Tell the synchronization logic that we dropped one item from the incoming connection queue */
+ server_notify_stream(s, stream);
+
return 0;
}
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once
-#include "fdset.h"
-
typedef struct Server Server;
typedef struct StdoutStream StdoutStream;
+typedef struct StreamSyncReq StreamSyncReq;
+
+#include "fdset.h"
+#include "list.h"
+
+typedef enum StdoutStreamState {
+ STDOUT_STREAM_IDENTIFIER,
+ STDOUT_STREAM_UNIT_ID,
+ STDOUT_STREAM_PRIORITY,
+ STDOUT_STREAM_LEVEL_PREFIX,
+ STDOUT_STREAM_FORWARD_TO_SYSLOG,
+ STDOUT_STREAM_FORWARD_TO_KMSG,
+ STDOUT_STREAM_FORWARD_TO_CONSOLE,
+ STDOUT_STREAM_RUNNING,
+} StdoutStreamState;
+
+struct StdoutStream {
+ Server *server;
+ StdoutStreamState state;
+
+ int fd;
+
+ struct ucred ucred;
+ char *label;
+ char *identifier;
+ char *unit_id;
+ int priority;
+ bool level_prefix:1;
+ bool forward_to_syslog:1;
+ bool forward_to_kmsg:1;
+ bool forward_to_console:1;
+
+ bool fdstore:1;
+ bool in_notify_queue:1;
+
+ char *buffer;
+ size_t length;
+
+ sd_event_source *event_source;
+
+ char *state_file;
+
+ ClientContext *context;
+
+ LIST_FIELDS(StdoutStream, stdout_stream);
+ LIST_FIELDS(StdoutStream, stdout_stream_notify_queue);
+
+ char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX];
+
+ LIST_HEAD(StreamSyncReq, stream_sync_reqs);
+};
int server_open_stdout_socket(Server *s, const char *stdout_socket);
int server_restore_streams(Server *s, FDSet *fds);
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <linux/sockios.h>
+#include <sys/ioctl.h>
+
+#include "sd-varlink.h"
+
+#include "io-util.h"
+#include "journald-server.h"
+#include "journald-stream.h"
+#include "journald-sync.h"
+#include "journald-varlink.h"
+#include "socket-netlink.h"
+#include "time-util.h"
+
+StreamSyncReq *stream_sync_req_free(StreamSyncReq *ssr) {
+ if (!ssr)
+ return NULL;
+
+ if (ssr->req)
+ LIST_REMOVE(by_sync_req, ssr->req->stream_sync_reqs, ssr);
+ if (ssr->stream)
+ LIST_REMOVE(by_stdout_stream, ssr->stream->stream_sync_reqs, ssr);
+
+ return mfree(ssr);
+}
+
+void stream_sync_req_advance_revalidate(StreamSyncReq *ssr, size_t p) {
+ assert(ssr);
+
+ /* Subtract the specified number of bytes from the byte counter. And when we hit zero we consider
+ * this stream processed for the synchronization request */
+
+ /* NB: This might invalidate the 'ssr' object! */
+
+ if (p < ssr->pending_siocinq) {
+ ssr->pending_siocinq -= p;
+ return;
+ }
+
+ SyncReq *req = ASSERT_PTR(ssr->req);
+ stream_sync_req_free(TAKE_PTR(ssr));
+
+ /* Maybe we are done now? */
+ sync_req_revalidate(TAKE_PTR(req));
+}
+
+static bool sync_req_is_complete(SyncReq *req) {
+ int r;
+
+ assert(req);
+ assert(req->server);
+
+ /* In case the clock jumped backwards, let's adjust the timestamp, to guarantee reasonably quick
+ * termination */
+ usec_t n = now(CLOCK_REALTIME);
+ if (n < req->realtime)
+ req->realtime = n;
+
+ if (req->realtime_prioq_idx != PRIOQ_IDX_NULL) {
+ /* If this sync request is still in the priority queue it means we still need to check if
+ * incoming message timestamps are now newer than then sync request timestamp. */
+
+ if (req->server->native_fd >= 0 &&
+ req->server->native_timestamp < req->realtime) {
+ r = fd_wait_for_event(req->server->native_fd, POLLIN, /* timeout= */ 0);
+ if (r < 0)
+ log_debug_errno(r, "Failed to determine pending IO events of native socket, ignoring: %m");
+ else if (r != 0) /* if there's more queued we need to wait for the timestamp to pass. If it's idle though we are done here. */
+ return false;
+ }
+
+ if (req->server->syslog_fd >= 0&&
+ req->server->syslog_timestamp < req->realtime) {
+ r = fd_wait_for_event(req->server->syslog_fd, POLLIN, /* timeout= */ 0);
+ if (r < 0)
+ log_debug_errno(r, "Failed to determine pending IO events of syslog socket, ignoring: %m");
+ else if (r != 0)
+ return false;
+ }
+
+ /* This sync request is fulfilled for the native + syslog datagram streams? Then, let's
+ * remove this sync request from the priority queue, so that we dont need to consider it
+ * anymore. */
+ assert(prioq_remove(req->server->sync_req_realtime_prioq, req, &req->realtime_prioq_idx) > 0);
+ }
+
+ if (req->boottime_prioq_idx != PRIOQ_IDX_NULL) {
+ /* Very similar to the above, but for /dev/kmsg we operate on the CLOCK_BOOTTIME clock */
+
+ if (req->server->dev_kmsg_fd >= 0 &&
+ req->server->dev_kmsg_timestamp < req->boottime) {
+ r = fd_wait_for_event(req->server->dev_kmsg_fd, POLLIN, /* timeout= */ 0);
+ if (r < 0)
+ log_debug_errno(r, "Failed to determine pending IO events of /dev/kmsg file descriptor, ignoring: %m");
+ else if (r != 0)
+ return false;
+ }
+
+ assert(prioq_remove(req->server->sync_req_boottime_prioq, req, &req->boottime_prioq_idx) > 0);
+ }
+
+ /* If there are still streams with pending counters, we still need to look into things */
+ if (req->stream_sync_reqs)
+ return false;
+
+ /* If there are still pending connections from before the sync started, we still need to look into things */
+ if (req->pending_rqlen > 0)
+ return false;
+
+ return true;
+}
+
+static int on_idle(sd_event_source *s, void *userdata) {
+ SyncReq *req = ASSERT_PTR(userdata);
+
+ req->idle_event_source = sd_event_source_disable_unref(req->idle_event_source);
+
+ /* When this idle event triggers, then we definitely are done with the synchronization request. This
+ * is a safety net of a kind, to ensure we'll definitely put an end to any synchronization request,
+ * even if we are confused by CLOCK_REALTIME jumps or similar. */
+ sync_req_varlink_reply(TAKE_PTR(req));
+ return 0;
+}
+
+SyncReq* sync_req_free(SyncReq *req) {
+ if (!req)
+ return NULL;
+
+ if (req->server) {
+ if (req->realtime_prioq_idx != PRIOQ_IDX_NULL)
+ assert_se(prioq_remove(req->server->sync_req_realtime_prioq, req, &req->realtime_prioq_idx) > 0);
+
+ if (req->boottime_prioq_idx != PRIOQ_IDX_NULL)
+ assert_se(prioq_remove(req->server->sync_req_boottime_prioq, req, &req->boottime_prioq_idx) > 0);
+
+ if (req->pending_rqlen > 0)
+ LIST_REMOVE(pending_rqlen, req->server->sync_req_pending_rqlen, req);
+ }
+
+ req->idle_event_source = sd_event_source_disable_unref(req->idle_event_source);
+
+ sd_varlink_unref(req->link);
+
+ while (req->stream_sync_reqs)
+ stream_sync_req_free(req->stream_sync_reqs);
+
+ return mfree(req);
+}
+
+static int sync_req_realtime_compare(const SyncReq *x, const SyncReq *y) {
+ return CMP(ASSERT_PTR(x)->realtime, ASSERT_PTR(y)->realtime);
+}
+
+static int sync_req_boottime_compare(const SyncReq *x, const SyncReq *y) {
+ return CMP(ASSERT_PTR(x)->boottime, ASSERT_PTR(y)->boottime);
+}
+
+static int sync_req_add_stream(SyncReq *req, StdoutStream *ss) {
+ assert(req);
+ assert(ss);
+
+ int v = 0;
+ if (ioctl(ss->fd, SIOCINQ, &v) < 0)
+ log_debug_errno(errno, "Failed to issue SIOCINQ on stream socket, ignoring: %m");
+ if (v <= 0)
+ return 0; /* Pending messages are zero anyway? then there's nothing to track */
+
+ _cleanup_(stream_sync_req_freep) StreamSyncReq *ssr = new(StreamSyncReq, 1);
+ if (!ssr)
+ return -ENOMEM;
+
+ *ssr = (StreamSyncReq) {
+ .stream = ss,
+ .pending_siocinq = v,
+ .req = req,
+ };
+
+ LIST_PREPEND(by_sync_req, req->stream_sync_reqs, ssr);
+ LIST_PREPEND(by_stdout_stream, ss->stream_sync_reqs, ssr);
+
+ TAKE_PTR(ssr);
+ return 1;
+}
+
+int sync_req_new(Server *s, sd_varlink *link, SyncReq **ret) {
+ int r;
+
+ assert(s);
+ assert(link);
+ assert(ret);
+
+ _cleanup_(sync_req_freep) SyncReq *req = new(SyncReq, 1);
+ if (!req)
+ return -ENOMEM;
+
+ *req = (SyncReq) {
+ .server = s,
+ .link = sd_varlink_ref(link),
+ .realtime_prioq_idx = PRIOQ_IDX_NULL,
+ .boottime_prioq_idx = PRIOQ_IDX_NULL,
+ };
+
+ /* We use five distinct mechanisms to determine when the synchronization request is complete:
+ *
+ * 1. For the syslog/native AF_UNIX/SOCK_DGRAM sockets we look at the datagram timestamps: once the
+ * most recently seen datagram on the socket is newer than the timestamp when we initiated the
+ * sync request we know that all previously enqueued messages have been processed by us.
+ *
+ * 2. For established stream AF_UNIX/SOCK_STREAM sockets we have no timestamps. For them we take the
+ * SIOCINQ counter at the moment the synchronization request was enqueued. And once we processed
+ * the indicated number of input bytes we know that anything further was enqueued later than the
+ * original synchronization request we started from.
+ *
+ * 3. For pending new, un-accept()ed stream AF_UNIX/SOCK_STREAM sockets we have no timestamps either,
+ * but we can query the number of pending connections via the sockdiag netlink protocol (I so wish
+ * there was an easier, quicker way!). Once we accept()ed that many connections we know all
+ * further connections are definitely more recent than the sync request.
+ *
+ * 4. For /dev/kmsg we look at the log message timestamps, similar to the AF_UNIX/SOCK_DGRAM case,
+ * and they are in CLOCK_BOOTTIME clock.
+ *
+ * 5. Finally, as safety net we install an idle handler with a very low priority (lower than the
+ * syslog/native/stream IO handlers). If this handler is called we know that there's no pending
+ * IO, hence everything so far queued is definitely processed.
+ *
+ * Note the asymmetry: for AF_UNIX/SOCK_DGRAM + /dev/kmsg we go by timestamp, for established
+ * AF_UNIX/SOCK_STREAM we count bytes. That's because for SOCK_STREAM we have no timestamps, and for
+ * SOCK_DGRAM we have no API to query all pending bytes (as SIOCINQ on SOCK_DGRAM reports size of
+ * next datagram, not size of all pending datagrams). Ideally, we'd actually use neither of this, and
+ * the kernel would provide us CLOCK_MONOTONIC timestamps...
+ *
+ * Note that CLOCK_REALTIME is not necessarily monotonic (that's the whole point of it after all). If
+ * the clock jumps then we know the algorithm will eventually terminate, because of the idle handler
+ * that is our safety net. (Also, whenever we see poll() return an empty revents for some source we
+ * know everything is processed by now regardless of any timestamps or pending byte or connection
+ * counts.) */
+
+ req->realtime = now(CLOCK_REALTIME);
+ req->boottime = now(CLOCK_BOOTTIME);
+
+ if (s->native_event_source || s->syslog_event_source) {
+ r = prioq_ensure_put(&s->sync_req_realtime_prioq, sync_req_realtime_compare, req, &req->realtime_prioq_idx);
+ if (r < 0)
+ return r;
+ }
+
+ if (s->dev_kmsg_event_source) {
+ r = prioq_ensure_put(&s->sync_req_boottime_prioq, sync_req_boottime_compare, req, &req->boottime_prioq_idx);
+ if (r < 0)
+ return r;
+ }
+
+ r = sd_event_add_defer(s->event, &req->idle_event_source, on_idle, req);
+ if (r < 0)
+ return r;
+
+ r = sd_event_source_set_priority(req->idle_event_source, SD_EVENT_PRIORITY_NORMAL+15);
+ if (r < 0)
+ return r;
+
+ (void) sd_event_source_set_description(req->idle_event_source, "deferred-sync");
+
+ /* Now determine the pending byte counter for each stdout stream. If non-zero allocate a
+ * StreamSyncReq for the stream to keep track of it */
+ LIST_FOREACH(stdout_stream, ss, s->stdout_streams) {
+ r = sync_req_add_stream(req, ss);
+ if (r < 0)
+ return r;
+ }
+
+ /* Also track how many pending, incoming stream sockets there are currently, so that we process them
+ * too */
+ r = af_unix_get_qlen(s->stdout_fd, &req->pending_rqlen);
+ if (r < 0)
+ log_warning_errno(r, "Failed to determine current incoming queue length, ignoring: %m");
+ if (req->pending_rqlen > 0)
+ LIST_PREPEND(pending_rqlen, s->sync_req_pending_rqlen, req);
+
+ *ret = TAKE_PTR(req);
+ return 0;
+}
+
+static void sync_req_advance_rqlen_revalidate(SyncReq *req, uint32_t current_rqlen, StdoutStream *ss) {
+ int r;
+
+ assert(req);
+
+ /* Invoked whenever a new connection was accept()ed, i.e. dropped off the queue of pending incoming
+ * connections. We decrease the qlen counter by one here, except if the new overall counter is
+ * already below our target. */
+
+ uint32_t n;
+ if (req->pending_rqlen <= 0)
+ n = 0;
+ else if (req->pending_rqlen > current_rqlen)
+ n = current_rqlen;
+ else
+ n = req->pending_rqlen - 1;
+
+ if (req->pending_rqlen > 0) {
+ /* if this synchronization request is supposed to process a non-zero number of connections we
+ * need to also track what's inside those stream connections */
+ if (ss) {
+ r = sync_req_add_stream(req, ss);
+ if (r < 0)
+ log_warning_errno(r, "Failed to track stream queue size, ignoring: %m");
+ }
+
+ /* If there are no more connections to wait for, remove us from the list of synchronization
+ * requests with non-zero pending connection counters */
+ if (n == 0)
+ LIST_REMOVE(pending_rqlen, req->server->sync_req_pending_rqlen, req);
+ }
+
+ req->pending_rqlen = n;
+
+ sync_req_revalidate(req);
+}
+
+void server_notify_stream(Server *s, StdoutStream *ss) {
+ int r;
+
+ assert(s);
+
+ /* Invoked whenever a new connection was accept()ed, i.e. dropped off the queue of pending incoming
+ * connections. */
+
+ if (!s->sync_req_pending_rqlen)
+ return;
+
+ uint32_t current_qlen;
+
+ r = af_unix_get_qlen(s->stdout_fd, ¤t_qlen);
+ if (r < 0) {
+ log_warning_errno(r, "Failed to determine current AF_UNIX stream socket pending connections, ignoring: %m");
+ current_qlen = UINT32_MAX;
+ }
+
+ LIST_FOREACH(pending_rqlen, sr, s->sync_req_pending_rqlen)
+ /* NB: this might invalidate the SyncReq object! */
+ sync_req_advance_rqlen_revalidate(sr, current_qlen, ss);
+}
+
+bool sync_req_revalidate(SyncReq *req) {
+ assert(req);
+
+ /* Check if the synchronization request is complete now. If so, answer the Varlink client. NB: this
+ * might invalidate the SyncReq object */
+
+ if (!sync_req_is_complete(req))
+ return false;
+
+ sync_req_varlink_reply(TAKE_PTR(req));
+ return true;
+}
+
+void sync_req_revalidate_by_timestamp(Server *s) {
+ assert(s);
+
+ /* Go through the pending sync requests by timestamp, and complete those for which a sync is now
+ * complete. */
+
+ SyncReq *req;
+ while ((req = prioq_peek(s->sync_req_realtime_prioq)))
+ if (!sync_req_revalidate(req))
+ break;
+
+ while ((req = prioq_peek(s->sync_req_boottime_prioq)))
+ if (!sync_req_revalidate(req))
+ break;
+}
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+typedef struct Server Server;
+typedef struct StreamSyncReq StreamSyncReq;
+typedef struct SyncReq SyncReq;
+
+#include "journald-stream.h"
+#include "list.h"
+#include "macro.h"
+
+/* Encapsulates the synchronization request data we need to keep per STDOUT stream. Primarily a byte counter
+ * to count down. */
+struct StreamSyncReq {
+ SyncReq *req;
+ StdoutStream *stream;
+
+ uint64_t pending_siocinq; /* The SIOCINQ counter when the sync was initiated */
+
+ LIST_FIELDS(StreamSyncReq, by_sync_req);
+ LIST_FIELDS(StreamSyncReq, by_stdout_stream);
+};
+
+/* Encapsulates a synchronization request */
+struct SyncReq {
+ Server *server;
+ sd_varlink *link;
+
+ bool offline; /* if true, we'll offline the journal files after sync is complete */
+
+ usec_t realtime; /* CLOCK_REALTIME timestamp when synchronization request was initiated (for syncing on AF_UNIX/SOCK_DGRAM) */
+ usec_t boottime; /* CLOCK_BOOTTIME timestamp when synchronization request was initiated (for syncing on /dev/kmsg) */
+
+ sd_event_source *idle_event_source;
+
+ uint32_t pending_rqlen; /* The rqlen counter on the stream AF_UNIX socket when the sync was initiated */
+ LIST_FIELDS(SyncReq, pending_rqlen);
+
+ LIST_HEAD(StreamSyncReq, stream_sync_reqs);
+
+ unsigned realtime_prioq_idx;
+ unsigned boottime_prioq_idx;
+};
+
+StreamSyncReq *stream_sync_req_free(StreamSyncReq *ssr);
+DEFINE_TRIVIAL_CLEANUP_FUNC(StreamSyncReq*, stream_sync_req_free);
+void stream_sync_req_advance_revalidate(StreamSyncReq *ssr, size_t p);
+
+int sync_req_new(Server *s, sd_varlink *link, SyncReq **ret);
+SyncReq* sync_req_free(SyncReq *req);
+DEFINE_TRIVIAL_CLEANUP_FUNC(SyncReq*, sync_req_free);
+
+bool sync_req_revalidate(SyncReq *req);
+void sync_req_revalidate_by_timestamp(Server *s);
+
+void server_notify_stream(Server *s, StdoutStream *ss);
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include "journald-server.h"
+#include "journald-sync.h"
#include "journald-varlink.h"
#include "varlink-io.systemd.Journal.h"
#include "varlink-io.systemd.service.h"
#include "varlink-util.h"
-static int synchronize_second_half(sd_event_source *event_source, void *userdata) {
- sd_varlink *link = ASSERT_PTR(userdata);
- Server *s;
+void sync_req_varlink_reply(SyncReq *req) {
int r;
- assert_se(s = sd_varlink_get_userdata(link));
+ assert(req);
- /* This is the "second half" of the Synchronize() varlink method. This function is called as deferred
- * event source at a low priority to ensure the synchronization completes after all queued log
- * messages are processed. */
- server_full_sync(s, /* wait = */ true);
+ /* This is the "second half" of the Synchronize() varlink method. This function is called when we
+ * determine that no messages that were enqueued to us when the request was initiated is pending
+ * anymore. */
- /* Let's get rid of the event source now, by marking it as non-floating again. It then has no ref
- * anymore and is immediately destroyed after we return from this function, i.e. from this event
- * source handler at the end. */
- r = sd_event_source_set_floating(event_source, false);
- if (r < 0)
- return log_error_errno(r, "Failed to mark event source as non-floating: %m");
+ if (req->offline)
+ server_full_sync(req->server, /* wait = */ true);
- return sd_varlink_reply(link, NULL);
-}
+ /* Disconnect the SyncReq from the Varlink connection object, and free it */
+ _cleanup_(sd_varlink_unrefp) sd_varlink *vl = TAKE_PTR(req->link);
+ sd_varlink_set_userdata(vl, req->server); /* reinstall server object */
+ req = sync_req_free(req);
-static void synchronize_destroy(void *userdata) {
- sd_varlink_unref(userdata);
+ r = sd_varlink_reply(vl, NULL);
+ if (r < 0)
+ log_debug_errno(r, "Failed to reply to Synchronize() client, ignoring: %m");
}
static int vl_method_synchronize(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
- _cleanup_(sd_event_source_unrefp) sd_event_source *event_source = NULL;
+ int offline = -1;
+
+ static const sd_json_dispatch_field dispatch_table[] = {
+ { "offline", SD_JSON_VARIANT_BOOLEAN, sd_json_dispatch_tristate, 0, 0},
+ {}
+ };
+
Server *s = ASSERT_PTR(userdata);
int r;
assert(link);
- r = sd_varlink_dispatch(link, parameters, /* dispatch_table = */ NULL, /* userdata = */ NULL);
+ r = sd_varlink_dispatch(link, parameters, dispatch_table, &offline);
if (r != 0)
return r;
- log_info("Received client request to sync journal.");
-
- /* We don't do the main work now, but instead enqueue a deferred event loop job which will do
- * it. That job is scheduled at low priority, so that we return from this method call only after all
- * queued but not processed log messages are written to disk, so that this method call returning can
- * be used as nice synchronization point. */
- r = sd_event_add_defer(s->event, &event_source, synchronize_second_half, link);
- if (r < 0)
- return log_error_errno(r, "Failed to allocate defer event source: %m");
-
- r = sd_event_source_set_destroy_callback(event_source, synchronize_destroy);
- if (r < 0)
- return log_error_errno(r, "Failed to set event source destroy callback: %m");
-
- sd_varlink_ref(link); /* The varlink object is now left to the destroy callback to unref */
+ log_full(offline != 0 ? LOG_INFO : LOG_DEBUG,
+ "Received client request to sync journal (%s offlining).", offline != 0 ? "with" : "without");
- r = sd_event_source_set_priority(event_source, SD_EVENT_PRIORITY_NORMAL+15);
- if (r < 0)
- return log_error_errno(r, "Failed to set defer event source priority: %m");
+ _cleanup_(sync_req_freep) SyncReq *sr = NULL;
- /* Give up ownership of this event source. It will now be destroyed along with event loop itself,
- * unless it destroys itself earlier. */
- r = sd_event_source_set_floating(event_source, true);
+ r = sync_req_new(s, link, &sr);
if (r < 0)
- return log_error_errno(r, "Failed to mark event source as floating: %m");
+ return r;
- (void) sd_event_source_set_description(event_source, "deferred-sync");
+ sr->offline = offline != 0;
+ sd_varlink_set_userdata(link, sr);
+ sync_req_revalidate(TAKE_PTR(sr));
return 0;
}
assert(server);
assert(link);
+ void *u = sd_varlink_get_userdata(link);
+ if (u != s) {
+ /* If this is a Varlink connection that does not have the Server object as userdata, then it has a SyncReq object instead. Let's finish it. */
+
+ SyncReq *req = u;
+ sd_varlink_set_userdata(link, s); /* reinstall server object */
+ sync_req_free(req);
+ }
+
(void) server_start_or_stop_idle_timer(s); /* maybe we are idle now */
}
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once
-typedef struct Server Server;
+#include "journald-server.h"
+#include "journald-sync.h"
int server_open_varlink(Server *s, const char *socket, int fd);
+
+void sync_req_varlink_reply(SyncReq *req);
'journald-server.c',
'journald-socket.c',
'journald-stream.c',
+ 'journald-sync.c',
'journald-syslog.c',
'journald-varlink.c',
'journald-wall.c',
#include "varlink-io.systemd.Journal.h"
-static SD_VARLINK_DEFINE_METHOD(Synchronize);
+static SD_VARLINK_DEFINE_METHOD(
+ Synchronize,
+ SD_VARLINK_FIELD_COMMENT("Controls whether to offline the journal files as part of the synchronization operation."),
+ SD_VARLINK_DEFINE_INPUT(offline, SD_VARLINK_BOOL, 0));
+
static SD_VARLINK_DEFINE_METHOD(Rotate);
static SD_VARLINK_DEFINE_METHOD(FlushToVar);
static SD_VARLINK_DEFINE_METHOD(RelinquishVar);