/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#include <pthread.h>
+#include <unistd.h>
+
#include "chattr-util.h"
#include "fd-util.h"
#include "format-util.h"
#include "set.h"
#include "sync-util.h"
+static int journald_file_truncate(JournalFile *f) {
+ uint64_t p;
+ int r;
+
+ /* truncate excess from the end of archives */
+ r = journal_file_tail_end(f, &p);
+ if (r < 0)
+ return log_debug_errno(r, "Failed to determine end of tail object: %m");
+
+ /* arena_size can't exceed the file size, ensure it's updated before truncating */
+ f->header->arena_size = htole64(p - le64toh(f->header->header_size));
+
+ if (ftruncate(f->fd, p) < 0)
+ log_debug_errno(errno, "Failed to truncate %s: %m", f->path);
+
+ return 0;
+}
+
+static int journald_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) {
+ Object o;
+ uint64_t offset, sz, n_items = 0, n_unused;
+ int r;
+
+ if (n_entries == 0)
+ return 0;
+
+ for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) {
+ r = journal_file_read_object(f, OBJECT_ENTRY_ARRAY, q, &o);
+ if (r < 0)
+ return r;
+
+ n_items += journal_file_entry_array_n_items(&o);
+ p = q;
+ }
+
+ if (p == 0)
+ return 0;
+
+ if (n_entries > n_items)
+ return -EBADMSG;
+
+ /* Amount of unused items in the final entry array. */
+ n_unused = n_items - n_entries;
+
+ if (n_unused == 0)
+ return 0;
+
+ offset = p + offsetof(Object, entry_array.items) +
+ (journal_file_entry_array_n_items(&o) - n_unused) * sizeof(le64_t);
+ sz = p + le64toh(o.object.size) - offset;
+
+ if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0)
+ return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path);
+
+ return 0;
+}
+
+static int journald_file_punch_holes(JournalFile *f) {
+ HashItem items[4096 / sizeof(HashItem)];
+ uint64_t p, sz;
+ size_t to_read;
+ int r;
+
+ r = journald_file_entry_array_punch_hole(
+ f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries));
+ if (r < 0)
+ return r;
+
+ p = le64toh(f->header->data_hash_table_offset);
+ sz = le64toh(f->header->data_hash_table_size);
+ to_read = MIN((size_t) f->last_stat.st_blksize, sizeof(items));
+
+ for (uint64_t i = p; i < p + sz; i += sizeof(items)) {
+ ssize_t n_read;
+
+ n_read = pread(f->fd, items, MIN(to_read, p + sz - i), i);
+ if (n_read < 0)
+ return n_read;
+
+ for (size_t j = 0; j < (size_t) n_read / sizeof(HashItem); j++) {
+ Object o;
+
+ for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0;
+ q = le64toh(o.data.next_hash_offset)) {
+
+ r = journal_file_read_object(f, OBJECT_DATA, q, &o);
+ if (r < 0) {
+ log_debug_errno(r, "Invalid data object: %m, ignoring");
+ break;
+ }
+
+ if (le64toh(o.data.n_entries) == 0)
+ continue;
+
+ (void) journald_file_entry_array_punch_hole(
+ f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1);
+ }
+ }
+ }
+
+ return 0;
+}
+
+/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
+ * As a result we use atomic operations on f->offline_state for inter-thread communications with
+ * journal_file_set_offline() and journal_file_set_online(). */
+static void journald_file_set_offline_internal(JournaldFile *f) {
+ assert(f);
+ assert(f->file->fd >= 0);
+ assert(f->file->header);
+
+ for (;;) {
+ switch (f->file->offline_state) {
+ case OFFLINE_CANCEL:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
+ continue;
+ return;
+
+ case OFFLINE_AGAIN_FROM_SYNCING:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
+ continue;
+ break;
+
+ case OFFLINE_AGAIN_FROM_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
+ continue;
+ break;
+
+ case OFFLINE_SYNCING:
+ if (f->file->archive) {
+ (void) journald_file_truncate(f->file);
+ (void) journald_file_punch_holes(f->file);
+ }
+
+ (void) fsync(f->file->fd);
+
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
+ continue;
+
+ f->file->header->state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE;
+ (void) fsync(f->file->fd);
+ break;
+
+ case OFFLINE_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
+ continue;
+ _fallthrough_;
+ case OFFLINE_DONE:
+ return;
+
+ case OFFLINE_JOINED:
+ log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
+ return;
+ }
+ }
+}
+
+static void * journald_file_set_offline_thread(void *arg) {
+ JournaldFile *f = arg;
+
+ (void) pthread_setname_np(pthread_self(), "journal-offline");
+
+ journald_file_set_offline_internal(f);
+
+ return NULL;
+}
+
+/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
+static bool journald_file_set_offline_try_restart(JournaldFile *f) {
+ for (;;) {
+ switch (f->file->offline_state) {
+ case OFFLINE_AGAIN_FROM_SYNCING:
+ case OFFLINE_AGAIN_FROM_OFFLINING:
+ return true;
+
+ case OFFLINE_CANCEL:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
+ continue;
+ return true;
+
+ case OFFLINE_SYNCING:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
+ continue;
+ return true;
+
+ case OFFLINE_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->file->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
+ continue;
+ return true;
+
+ default:
+ return false;
+ }
+ }
+}
+
+/* Sets a journal offline.
+ *
+ * If wait is false then an offline is dispatched in a separate thread for a
+ * subsequent journal_file_set_offline() or journal_file_set_online() of the
+ * same journal to synchronize with.
+ *
+ * If wait is true, then either an existing offline thread will be restarted
+ * and joined, or if none exists the offline is simply performed in this
+ * context without involving another thread.
+ */
+int journald_file_set_offline(JournaldFile *f, bool wait) {
+ int target_state;
+ bool restarted;
+ int r;
+
+ assert(f);
+
+ if (!f->file->writable)
+ return -EPERM;
+
+ if (f->file->fd < 0 || !f->file->header)
+ return -EINVAL;
+
+ target_state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE;
+
+ /* An offlining journal is implicitly online and may modify f->header->state,
+ * we must also join any potentially lingering offline thread when already in
+ * the desired offline state.
+ */
+ if (!journald_file_is_offlining(f) && f->file->header->state == target_state)
+ return journal_file_set_offline_thread_join(f->file);
+
+ /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
+ restarted = journald_file_set_offline_try_restart(f);
+ if ((restarted && wait) || !restarted) {
+ r = journal_file_set_offline_thread_join(f->file);
+ if (r < 0)
+ return r;
+ }
+
+ if (restarted)
+ return 0;
+
+ /* Initiate a new offline. */
+ f->file->offline_state = OFFLINE_SYNCING;
+
+ if (wait) /* Without using a thread if waiting. */
+ journald_file_set_offline_internal(f);
+ else {
+ sigset_t ss, saved_ss;
+ int k;
+
+ assert_se(sigfillset(&ss) >= 0);
+ /* Don't block SIGBUS since the offlining thread accesses a memory mapped file.
+ * Asynchronous SIGBUS signals can safely be handled by either thread. */
+ assert_se(sigdelset(&ss, SIGBUS) >= 0);
+
+ r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss);
+ if (r > 0)
+ return -r;
+
+ r = pthread_create(&f->file->offline_thread, NULL, journald_file_set_offline_thread, f);
+
+ k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL);
+ if (r > 0) {
+ f->file->offline_state = OFFLINE_JOINED;
+ return -r;
+ }
+ if (k > 0)
+ return -k;
+ }
+
+ return 0;
+}
+
+bool journald_file_is_offlining(JournaldFile *f) {
+ assert(f);
+
+ __sync_synchronize();
+
+ if (IN_SET(f->file->offline_state, OFFLINE_DONE, OFFLINE_JOINED))
+ return false;
+
+ return true;
+}
+
JournaldFile* journald_file_close(JournaldFile *f) {
if (!f)
return NULL;
+#if HAVE_GCRYPT
+ /* Write the final tag */
+ if (f->file->seal && f->file->writable) {
+ int r;
+
+ r = journal_file_append_tag(f->file);
+ if (r < 0)
+ log_error_errno(r, "Failed to append tag when closing journal: %m");
+ }
+#endif
+
+ if (f->file->post_change_timer) {
+ if (sd_event_source_get_enabled(f->file->post_change_timer, NULL) > 0)
+ journal_file_post_change(f->file);
+
+ sd_event_source_disable_unref(f->file->post_change_timer);
+ }
+
+ journald_file_set_offline(f, true);
+
journal_file_close(f->file);
return mfree(f);
if (r < 0)
log_debug_errno(r, "Failed to add file to deferred close set, closing immediately.");
else {
- (void) journal_file_set_offline(f->file, false);
+ (void) journald_file_set_offline(f, false);
return NULL;
}
}
JournaldFile *template,
JournaldFile **ret);
+int journald_file_set_offline(JournaldFile *f, bool wait);
+bool journald_file_is_offlining(JournaldFile *f);
JournaldFile* journald_file_close(JournaldFile *f);
DEFINE_TRIVIAL_CLEANUP_FUNC(JournaldFile*, journald_file_close);
/* Perform any deferred closes which aren't still offlining. */
SET_FOREACH(f, s->deferred_closes) {
- if (journal_file_is_offlining(f->file))
+ if (journald_file_is_offlining(f))
continue;
(void) set_remove(s->deferred_closes, f);
int r;
if (s->system_journal) {
- r = journal_file_set_offline(s->system_journal->file, false);
+ r = journald_file_set_offline(s->system_journal, false);
if (r < 0)
log_warning_errno(r, "Failed to sync system journal, ignoring: %m");
}
ORDERED_HASHMAP_FOREACH(f, s->user_journals) {
- r = journal_file_set_offline(f->file, false);
+ r = journald_file_set_offline(f, false);
if (r < 0)
log_warning_errno(r, "Failed to sync user journal, ignoring: %m");
}
'journal-core',
sources,
include_directories : includes,
+ dependencies: threads,
install : false)
journal_includes = [includes, include_directories('.')]
# pragma GCC diagnostic ignored "-Waddress-of-packed-member"
#endif
-static int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) {
+int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset) {
Object tail;
uint64_t p;
int r;
return 0;
}
-static int journal_file_truncate(JournalFile *f) {
- uint64_t p;
- int r;
-
- /* truncate excess from the end of archives */
- r = journal_file_tail_end(f, &p);
- if (r < 0)
- return log_debug_errno(r, "Failed to determine end of tail object: %m");
-
- /* arena_size can't exceed the file size, ensure it's updated before truncating */
- f->header->arena_size = htole64(p - le64toh(f->header->header_size));
-
- if (ftruncate(f->fd, p) < 0)
- log_debug_errno(errno, "Failed to truncate %s: %m", f->path);
-
- return 0;
-}
-
-static int journal_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) {
- Object o;
- uint64_t offset, sz, n_items = 0, n_unused;
- int r;
-
- if (n_entries == 0)
- return 0;
-
- for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) {
- r = journal_file_read_object(f, OBJECT_ENTRY_ARRAY, q, &o);
- if (r < 0)
- return r;
-
- n_items += journal_file_entry_array_n_items(&o);
- p = q;
- }
-
- if (p == 0)
- return 0;
-
- if (n_entries > n_items)
- return -EBADMSG;
-
- /* Amount of unused items in the final entry array. */
- n_unused = n_items - n_entries;
-
- if (n_unused == 0)
- return 0;
-
- offset = p + offsetof(Object, entry_array.items) +
- (journal_file_entry_array_n_items(&o) - n_unused) * sizeof(le64_t);
- sz = p + le64toh(o.object.size) - offset;
-
- if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0)
- return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path);
-
- return 0;
-}
-
-static int journal_file_punch_holes(JournalFile *f) {
- HashItem items[4096 / sizeof(HashItem)];
- uint64_t p, sz;
- size_t to_read;
- int r;
-
- r = journal_file_entry_array_punch_hole(
- f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries));
- if (r < 0)
- return r;
-
- p = le64toh(f->header->data_hash_table_offset);
- sz = le64toh(f->header->data_hash_table_size);
- to_read = MIN((size_t) f->last_stat.st_blksize, sizeof(items));
-
- for (uint64_t i = p; i < p + sz; i += sizeof(items)) {
- ssize_t n_read;
-
- n_read = pread(f->fd, items, MIN(to_read, p + sz - i), i);
- if (n_read < 0)
- return n_read;
-
- for (size_t j = 0; j < (size_t) n_read / sizeof(HashItem); j++) {
- Object o;
-
- for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0;
- q = le64toh(o.data.next_hash_offset)) {
-
- r = journal_file_read_object(f, OBJECT_DATA, q, &o);
- if (r < 0) {
- log_debug_errno(r, "Invalid data object: %m, ignoring");
- break;
- }
-
- if (le64toh(o.data.n_entries) == 0)
- continue;
-
- (void) journal_file_entry_array_punch_hole(
- f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1);
- }
- }
- }
-
- return 0;
-}
-
-/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
- * As a result we use atomic operations on f->offline_state for inter-thread communications with
- * journal_file_set_offline() and journal_file_set_online(). */
-static void journal_file_set_offline_internal(JournalFile *f) {
- assert(f);
- assert(f->fd >= 0);
- assert(f->header);
-
- for (;;) {
- switch (f->offline_state) {
- case OFFLINE_CANCEL:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
- continue;
- return;
-
- case OFFLINE_AGAIN_FROM_SYNCING:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
- continue;
- break;
-
- case OFFLINE_AGAIN_FROM_OFFLINING:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
- continue;
- break;
-
- case OFFLINE_SYNCING:
- if (f->archive) {
- (void) journal_file_truncate(f);
- (void) journal_file_punch_holes(f);
- }
-
- (void) fsync(f->fd);
-
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
- continue;
-
- f->header->state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
- (void) fsync(f->fd);
- break;
-
- case OFFLINE_OFFLINING:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
- continue;
- _fallthrough_;
- case OFFLINE_DONE:
- return;
-
- case OFFLINE_JOINED:
- log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
- return;
- }
- }
-}
-
-static void * journal_file_set_offline_thread(void *arg) {
- JournalFile *f = arg;
-
- (void) pthread_setname_np(pthread_self(), "journal-offline");
-
- journal_file_set_offline_internal(f);
-
- return NULL;
-}
-
-static int journal_file_set_offline_thread_join(JournalFile *f) {
+int journal_file_set_offline_thread_join(JournalFile *f) {
int r;
assert(f);
return 0;
}
-/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
-static bool journal_file_set_offline_try_restart(JournalFile *f) {
- for (;;) {
- switch (f->offline_state) {
- case OFFLINE_AGAIN_FROM_SYNCING:
- case OFFLINE_AGAIN_FROM_OFFLINING:
- return true;
-
- case OFFLINE_CANCEL:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
- continue;
- return true;
-
- case OFFLINE_SYNCING:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
- continue;
- return true;
-
- case OFFLINE_OFFLINING:
- if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
- continue;
- return true;
-
- default:
- return false;
- }
- }
-}
-
-/* Sets a journal offline.
- *
- * If wait is false then an offline is dispatched in a separate thread for a
- * subsequent journal_file_set_offline() or journal_file_set_online() of the
- * same journal to synchronize with.
- *
- * If wait is true, then either an existing offline thread will be restarted
- * and joined, or if none exists the offline is simply performed in this
- * context without involving another thread.
- */
-int journal_file_set_offline(JournalFile *f, bool wait) {
- int target_state;
- bool restarted;
- int r;
-
- assert(f);
-
- if (!f->writable)
- return -EPERM;
-
- if (f->fd < 0 || !f->header)
- return -EINVAL;
-
- target_state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
-
- /* An offlining journal is implicitly online and may modify f->header->state,
- * we must also join any potentially lingering offline thread when already in
- * the desired offline state.
- */
- if (!journal_file_is_offlining(f) && f->header->state == target_state)
- return journal_file_set_offline_thread_join(f);
-
- /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
- restarted = journal_file_set_offline_try_restart(f);
- if ((restarted && wait) || !restarted) {
- r = journal_file_set_offline_thread_join(f);
- if (r < 0)
- return r;
- }
-
- if (restarted)
- return 0;
-
- /* Initiate a new offline. */
- f->offline_state = OFFLINE_SYNCING;
-
- if (wait) /* Without using a thread if waiting. */
- journal_file_set_offline_internal(f);
- else {
- sigset_t ss, saved_ss;
- int k;
-
- assert_se(sigfillset(&ss) >= 0);
- /* Don't block SIGBUS since the offlining thread accesses a memory mapped file.
- * Asynchronous SIGBUS signals can safely be handled by either thread. */
- assert_se(sigdelset(&ss, SIGBUS) >= 0);
-
- r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss);
- if (r > 0)
- return -r;
-
- r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
-
- k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL);
- if (r > 0) {
- f->offline_state = OFFLINE_JOINED;
- return -r;
- }
- if (k > 0)
- return -k;
- }
-
- return 0;
-}
-
static int journal_file_set_online(JournalFile *f) {
bool wait = true;
}
}
-bool journal_file_is_offlining(JournalFile *f) {
- assert(f);
-
- __sync_synchronize();
-
- if (IN_SET(f->offline_state, OFFLINE_DONE, OFFLINE_JOINED))
- return false;
-
- return true;
-}
-
JournalFile* journal_file_close(JournalFile *f) {
if (!f)
return NULL;
-#if HAVE_GCRYPT
- /* Write the final tag */
- if (f->seal && f->writable) {
- int r;
-
- r = journal_file_append_tag(f);
- if (r < 0)
- log_error_errno(r, "Failed to append tag when closing journal: %m");
- }
-#endif
-
- if (f->post_change_timer) {
- if (sd_event_source_get_enabled(f->post_change_timer, NULL) > 0)
- journal_file_post_change(f);
-
- sd_event_source_disable_unref(f->post_change_timer);
- }
-
- journal_file_set_offline(f, true);
-
if (f->mmap && f->cache_fd)
mmap_cache_fd_free(f->cache_fd);
JournalFile *template,
JournalFile **ret);
-int journal_file_set_offline(JournalFile *f, bool wait);
-bool journal_file_is_offlining(JournalFile *f);
+int journal_file_set_offline_thread_join(JournalFile *f);
JournalFile* journal_file_close(JournalFile *j);
int journal_file_fstat(JournalFile *f);
DEFINE_TRIVIAL_CLEANUP_FUNC(JournalFile*, journal_file_close);
int journal_file_move_to_object(JournalFile *f, ObjectType type, uint64_t offset, Object **ret);
int journal_file_read_object(JournalFile *f, ObjectType type, uint64_t offset, Object *ret);
+int journal_file_tail_end(JournalFile *f, uint64_t *ret_offset);
+
uint64_t journal_file_entry_n_items(Object *o) _pure_;
uint64_t journal_file_entry_array_n_items(Object *o) _pure_;
uint64_t journal_file_hash_table_n_items(Object *o) _pure_;