-/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
-
/***
This file is part of systemd.
#include <errno.h>
#include <fcntl.h>
#include <linux/fs.h>
+#include <pthread.h>
#include <stddef.h>
#include <sys/mman.h>
#include <sys/statvfs.h>
#include "journal-file.h"
#include "lookup3.h"
#include "parse-util.h"
+#include "path-util.h"
#include "random-util.h"
#include "sd-event.h"
+#include "set.h"
#include "string-util.h"
#include "xattr-util.h"
/* The mmap context to use for the header we pick as one above the last defined typed */
#define CONTEXT_HEADER _OBJECT_TYPE_MAX
-static int journal_file_set_online(JournalFile *f) {
+/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
+ * As a result we use atomic operations on f->offline_state for inter-thread communications with
+ * journal_file_set_offline() and journal_file_set_online(). */
+static void journal_file_set_offline_internal(JournalFile *f) {
assert(f);
+ assert(f->fd >= 0);
+ assert(f->header);
- if (!f->writable)
- return -EPERM;
+ for (;;) {
+ switch (f->offline_state) {
+ case OFFLINE_CANCEL:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
+ continue;
+ return;
- if (!(f->fd >= 0 && f->header))
- return -EINVAL;
+ case OFFLINE_AGAIN_FROM_SYNCING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
+ continue;
+ break;
+
+ case OFFLINE_AGAIN_FROM_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
+ continue;
+ break;
+
+ case OFFLINE_SYNCING:
+ (void) fsync(f->fd);
+
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
+ continue;
+
+ f->header->state = f->archive ? STATE_ARCHIVED : STATE_OFFLINE;
+ (void) fsync(f->fd);
+ break;
+
+ case OFFLINE_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
+ continue;
+ /* fall through */
+
+ case OFFLINE_DONE:
+ return;
+
+ case OFFLINE_JOINED:
+ log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
+ return;
+ }
+ }
+}
+
+static void * journal_file_set_offline_thread(void *arg) {
+ JournalFile *f = arg;
+
+ journal_file_set_offline_internal(f);
+
+ return NULL;
+}
+
+static int journal_file_set_offline_thread_join(JournalFile *f) {
+ int r;
+
+ assert(f);
+
+ if (f->offline_state == OFFLINE_JOINED)
+ return 0;
+
+ r = pthread_join(f->offline_thread, NULL);
+ if (r)
+ return -r;
+
+ f->offline_state = OFFLINE_JOINED;
if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
- switch(f->header->state) {
- case STATE_ONLINE:
- return 0;
+ return 0;
+}
- case STATE_OFFLINE:
- f->header->state = STATE_ONLINE;
- fsync(f->fd);
- return 0;
+/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
+static bool journal_file_set_offline_try_restart(JournalFile *f) {
+ for (;;) {
+ switch (f->offline_state) {
+ case OFFLINE_AGAIN_FROM_SYNCING:
+ case OFFLINE_AGAIN_FROM_OFFLINING:
+ return true;
+
+ case OFFLINE_CANCEL:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
+ continue;
+ return true;
+
+ case OFFLINE_SYNCING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
+ continue;
+ return true;
+
+ case OFFLINE_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
+ continue;
+ return true;
default:
- return -EINVAL;
+ return false;
+ }
}
}
-int journal_file_set_offline(JournalFile *f) {
+/* Sets a journal offline.
+ *
+ * If wait is false then an offline is dispatched in a separate thread for a
+ * subsequent journal_file_set_offline() or journal_file_set_online() of the
+ * same journal to synchronize with.
+ *
+ * If wait is true, then either an existing offline thread will be restarted
+ * and joined, or if none exists the offline is simply performed in this
+ * context without involving another thread.
+ */
+int journal_file_set_offline(JournalFile *f, bool wait) {
+ bool restarted;
+ int r;
+
assert(f);
if (!f->writable)
if (!(f->fd >= 0 && f->header))
return -EINVAL;
- if (f->header->state != STATE_ONLINE)
+ /* An offlining journal is implicitly online and may modify f->header->state,
+ * we must also join any potentially lingering offline thread when not online. */
+ if (!journal_file_is_offlining(f) && f->header->state != STATE_ONLINE)
+ return journal_file_set_offline_thread_join(f);
+
+ /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
+ restarted = journal_file_set_offline_try_restart(f);
+ if ((restarted && wait) || !restarted) {
+ r = journal_file_set_offline_thread_join(f);
+ if (r < 0)
+ return r;
+ }
+
+ if (restarted)
return 0;
- fsync(f->fd);
+ /* Initiate a new offline. */
+ f->offline_state = OFFLINE_SYNCING;
- if (mmap_cache_got_sigbus(f->mmap, f->fd))
- return -EIO;
+ if (wait) /* Without using a thread if waiting. */
+ journal_file_set_offline_internal(f);
+ else {
+ r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
+ if (r > 0) {
+ f->offline_state = OFFLINE_JOINED;
+ return -r;
+ }
+ }
- f->header->state = STATE_OFFLINE;
+ return 0;
+}
+
+static int journal_file_set_online(JournalFile *f) {
+ bool joined = false;
+
+ assert(f);
+
+ if (!f->writable)
+ return -EPERM;
+
+ if (!(f->fd >= 0 && f->header))
+ return -EINVAL;
+
+ while (!joined) {
+ switch (f->offline_state) {
+ case OFFLINE_JOINED:
+ /* No offline thread, no need to wait. */
+ joined = true;
+ break;
+
+ case OFFLINE_SYNCING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_CANCEL))
+ continue;
+ /* Canceled syncing prior to offlining, no need to wait. */
+ break;
+
+ case OFFLINE_AGAIN_FROM_SYNCING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_CANCEL))
+ continue;
+ /* Canceled restart from syncing, no need to wait. */
+ break;
+
+ case OFFLINE_AGAIN_FROM_OFFLINING:
+ if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_CANCEL))
+ continue;
+ /* Canceled restart from offlining, must wait for offlining to complete however. */
+
+ /* fall through to wait */
+ default: {
+ int r;
+
+ r = journal_file_set_offline_thread_join(f);
+ if (r < 0)
+ return r;
+
+ joined = true;
+ break;
+ }
+ }
+ }
if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
- fsync(f->fd);
+ switch (f->header->state) {
+ case STATE_ONLINE:
+ return 0;
- return 0;
+ case STATE_OFFLINE:
+ f->header->state = STATE_ONLINE;
+ (void) fsync(f->fd);
+ return 0;
+
+ default:
+ return -EINVAL;
+ }
+}
+
+bool journal_file_is_offlining(JournalFile *f) {
+ assert(f);
+
+ __sync_synchronize();
+
+ if (f->offline_state == OFFLINE_DONE ||
+ f->offline_state == OFFLINE_JOINED)
+ return false;
+
+ return true;
}
JournalFile* journal_file_close(JournalFile *f) {
sd_event_source_unref(f->post_change_timer);
}
- journal_file_set_offline(f);
+ journal_file_set_offline(f, true);
if (f->mmap && f->fd >= 0)
mmap_cache_close_fd(f->mmap, f->fd);
(void) btrfs_defrag_fd(f->fd);
}
- safe_close(f->fd);
+ if (f->close_fd)
+ safe_close(f->fd);
free(f->path);
mmap_cache_unref(f->mmap);
return NULL;
}
+void journal_file_close_set(Set *s) {
+ JournalFile *f;
+
+ assert(s);
+
+ while ((f = set_steal_first(s)))
+ (void) journal_file_close(f);
+}
+
static int journal_file_init_header(JournalFile *f, JournalFile *template) {
Header h = {};
ssize_t k;
int r;
assert(f);
+ assert(f->header);
r = sd_id128_get_machine(&f->header->machine_id);
if (r < 0)
r = journal_file_set_online(f);
/* Sync the online state to disk */
- fsync(f->fd);
+ (void) fsync(f->fd);
return r;
}
uint32_t flags;
assert(f);
+ assert(f->header);
if (memcmp(f->header->signature, HEADER_SIGNATURE, 8))
return -EBADMSG;
int r;
assert(f);
+ assert(f->header);
/* We assume that this file is not sparse, and we know that
* for sure, since we always call posix_fallocate()
/* Objects may only be located at multiple of 64 bit */
if (!VALID64(offset))
- return -EFAULT;
+ return -EBADMSG;
+
+ /* Object may not be located in the file header */
+ if (offset < le64toh(f->header->header_size))
+ return -EBADMSG;
r = journal_file_move_to(f, type, false, offset, sizeof(ObjectHeader), &t);
if (r < 0)
uint64_t r;
assert(f);
+ assert(f->header);
r = le64toh(f->header->tail_entry_seqnum) + 1;
void *t;
assert(f);
+ assert(f->header);
assert(type > OBJECT_UNUSED && type < _OBJECT_TYPE_MAX);
assert(size >= sizeof(ObjectHeader));
assert(offset);
int r;
assert(f);
+ assert(f->header);
/* We estimate that we need 1 hash table entry per 768 bytes
of journal file and we want to make sure we never get
int r;
assert(f);
+ assert(f->header);
/* We use a fixed size hash table for the fields as this
* number should grow very slowly only */
int r;
assert(f);
+ assert(f->header);
if (f->data_hash_table)
return 0;
int r;
assert(f);
+ assert(f->header);
if (f->field_hash_table)
return 0;
int r;
assert(f);
+ assert(f->header);
+ assert(f->field_hash_table);
assert(o);
assert(offset > 0);
int r;
assert(f);
+ assert(f->header);
+ assert(f->data_hash_table);
assert(o);
assert(offset > 0);
int r;
assert(f);
+ assert(f->header);
assert(field && size > 0);
/* If the field hash table is empty, we can't find anything */
int r;
assert(f);
+ assert(f->header);
assert(data || size == 0);
/* If there's no data hash table, then there's no entry. */
}
#endif
- if (compression == 0 && size > 0)
- memcpy(o->data.payload, data, size);
+ if (compression == 0)
+ memcpy_safe(o->data.payload, data, size);
r = journal_file_link_data(f, o, p, hash);
if (r < 0)
Object *o;
assert(f);
+ assert(f->header);
assert(first);
assert(idx);
assert(p > 0);
int r;
assert(f);
+ assert(f->header);
assert(o);
assert(offset > 0);
int r;
assert(f);
+ assert(f->header);
assert(items || n_items == 0);
assert(ts);
return r;
o->entry.seqnum = htole64(journal_file_entry_seqnum(f, seqnum));
- memcpy(o->entry.items, items, n_items * sizeof(EntryItem));
+ memcpy_safe(o->entry.items, items, n_items * sizeof(EntryItem));
o->entry.realtime = htole64(ts->realtime);
o->entry.monotonic = htole64(ts->monotonic);
o->entry.xor_hash = htole64(xor_hash);
struct dual_timestamp _ts;
assert(f);
+ assert(f->header);
assert(iovec || n_iovec == 0);
if (!ts) {
ts = &_ts;
}
- if (f->tail_entry_monotonic_valid &&
- ts->monotonic < le64toh(f->header->tail_entry_monotonic))
- return -EINVAL;
-
#ifdef HAVE_GCRYPT
r = journal_file_maybe_append_tag(f, ts->realtime);
if (r < 0)
i = right - 1;
lp = p = le64toh(array->entry_array.items[i]);
if (p <= 0)
- return -EBADMSG;
-
- r = test_object(f, p, needle);
+ r = -EBADMSG;
+ else
+ r = test_object(f, p, needle);
+ if (r == -EBADMSG) {
+ log_debug_errno(r, "Encountered invalid entry while bisecting, cutting algorithm short. (1)");
+ n = i;
+ continue;
+ }
if (r < 0)
return r;
p = le64toh(array->entry_array.items[i]);
if (p <= 0)
- return -EBADMSG;
-
- r = test_object(f, p, needle);
+ r = -EBADMSG;
+ else
+ r = test_object(f, p, needle);
+ if (r == -EBADMSG) {
+ log_debug_errno(r, "Encountered invalid entry while bisecting, cutting algorithm short. (2)");
+ right = n = i;
+ continue;
+ }
if (r < 0)
return r;
goto found;
if (r > 0 && idx)
- (*idx) ++;
+ (*idx)++;
return r;
direction_t direction,
Object **ret,
uint64_t *offset) {
+ assert(f);
+ assert(f->header);
return generic_array_bisect(f,
le64toh(f->header->entry_array_offset),
direction_t direction,
Object **ret,
uint64_t *offset) {
+ assert(f);
+ assert(f->header);
return generic_array_bisect(f,
le64toh(f->header->entry_array_offset),
int journal_file_compare_locations(JournalFile *af, JournalFile *bf) {
assert(af);
+ assert(af->header);
assert(bf);
+ assert(bf->header);
assert(af->location_type == LOCATION_SEEK);
assert(bf->location_type == LOCATION_SEEK);
int r;
assert(f);
+ assert(f->header);
n = le64toh(f->header->n_entries);
if (n <= 0)
le64toh(f->header->entry_array_offset),
i,
ret, &ofs);
+ if (r == -EBADMSG && direction == DIRECTION_DOWN) {
+ /* Special case: when we iterate throught the journal file linearly, and hit an entry we can't read,
+ * consider this the end of the journal file. */
+ log_debug_errno(r, "Encountered entry we can't read while iterating through journal file. Considering this the end of the file.");
+ return 0;
+ }
if (r <= 0)
return r;
if (p > 0 &&
(direction == DIRECTION_DOWN ? ofs <= p : ofs >= p)) {
- log_debug("%s: entry array corrupted at entry %"PRIu64,
- f->path, i);
+ log_debug("%s: entry array corrupted at entry %" PRIu64, f->path, i);
return -EBADMSG;
}
uint64_t p;
assert(f);
+ assert(f->header);
journal_file_print_header(f);
char bytes[FORMAT_BYTES_MAX];
assert(f);
+ assert(f->header);
printf("File Path: %s\n"
"File ID: %s\n"
"Data Hash Table Size: %"PRIu64"\n"
"Field Hash Table Size: %"PRIu64"\n"
"Rotate Suggested: %s\n"
- "Head Sequential Number: %"PRIu64"\n"
- "Tail Sequential Number: %"PRIu64"\n"
- "Head Realtime Timestamp: %s\n"
- "Tail Realtime Timestamp: %s\n"
- "Tail Monotonic Timestamp: %s\n"
+ "Head Sequential Number: %"PRIu64" (%"PRIx64")\n"
+ "Tail Sequential Number: %"PRIu64" (%"PRIx64")\n"
+ "Head Realtime Timestamp: %s (%"PRIx64")\n"
+ "Tail Realtime Timestamp: %s (%"PRIx64")\n"
+ "Tail Monotonic Timestamp: %s (%"PRIx64")\n"
"Objects: %"PRIu64"\n"
"Entry Objects: %"PRIu64"\n",
f->path,
le64toh(f->header->data_hash_table_size) / sizeof(HashItem),
le64toh(f->header->field_hash_table_size) / sizeof(HashItem),
yes_no(journal_file_rotate_suggested(f, 0)),
- le64toh(f->header->head_entry_seqnum),
- le64toh(f->header->tail_entry_seqnum),
- format_timestamp_safe(x, sizeof(x), le64toh(f->header->head_entry_realtime)),
- format_timestamp_safe(y, sizeof(y), le64toh(f->header->tail_entry_realtime)),
- format_timespan(z, sizeof(z), le64toh(f->header->tail_entry_monotonic), USEC_PER_MSEC),
+ le64toh(f->header->head_entry_seqnum), le64toh(f->header->head_entry_seqnum),
+ le64toh(f->header->tail_entry_seqnum), le64toh(f->header->tail_entry_seqnum),
+ format_timestamp_safe(x, sizeof(x), le64toh(f->header->head_entry_realtime)), le64toh(f->header->head_entry_realtime),
+ format_timestamp_safe(y, sizeof(y), le64toh(f->header->tail_entry_realtime)), le64toh(f->header->tail_entry_realtime),
+ format_timespan(z, sizeof(z), le64toh(f->header->tail_entry_monotonic), USEC_PER_MSEC), le64toh(f->header->tail_entry_monotonic),
le64toh(f->header->n_objects),
le64toh(f->header->n_entries));
}
int journal_file_open(
+ int fd,
const char *fname,
int flags,
mode_t mode,
bool seal,
JournalMetrics *metrics,
MMapCache *mmap_cache,
+ Set *deferred_closes,
JournalFile *template,
JournalFile **ret) {
void *h;
int r;
- assert(fname);
assert(ret);
+ assert(fd >= 0 || fname);
if ((flags & O_ACCMODE) != O_RDONLY &&
(flags & O_ACCMODE) != O_RDWR)
return -EINVAL;
- if (!endswith(fname, ".journal") &&
- !endswith(fname, ".journal~"))
- return -EINVAL;
+ if (fname) {
+ if (!endswith(fname, ".journal") &&
+ !endswith(fname, ".journal~"))
+ return -EINVAL;
+ }
f = new0(JournalFile, 1);
if (!f)
return -ENOMEM;
- f->fd = -1;
+ f->fd = fd;
f->mode = mode;
f->flags = flags;
}
}
- f->path = strdup(fname);
+ if (fname)
+ f->path = strdup(fname);
+ else /* If we don't know the path, fill in something explanatory and vaguely useful */
+ asprintf(&f->path, "/proc/self/%i", fd);
if (!f->path) {
r = -ENOMEM;
goto fail;
goto fail;
}
- f->fd = open(f->path, f->flags|O_CLOEXEC, f->mode);
if (f->fd < 0) {
- r = -errno;
- goto fail;
+ f->fd = open(f->path, f->flags|O_CLOEXEC, f->mode);
+ if (f->fd < 0) {
+ r = -errno;
+ goto fail;
+ }
+
+ /* fds we opened here by us should also be closed by us. */
+ f->close_fd = true;
}
r = journal_file_fstat(f);
f->header = h;
if (!newly_created) {
+ if (deferred_closes)
+ journal_file_close_set(deferred_closes);
+
r = journal_file_verify_header(f);
if (r < 0)
goto fail;
goto fail;
}
+ /* The file is opened now successfully, thus we take possesion of any passed in fd. */
+ f->close_fd = true;
+
*ret = f;
return 0;
if (f->fd >= 0 && mmap_cache_got_sigbus(f->mmap, f->fd))
r = -EIO;
- journal_file_close(f);
+ (void) journal_file_close(f);
return r;
}
-int journal_file_rotate(JournalFile **f, bool compress, bool seal) {
+int journal_file_rotate(JournalFile **f, bool compress, bool seal, Set *deferred_closes) {
_cleanup_free_ char *p = NULL;
size_t l;
JournalFile *old_file, *new_file = NULL;
if (!old_file->writable)
return -EINVAL;
+ /* Is this a journal file that was passed to us as fd? If so, we synthesized a path name for it, and we refuse
+ * rotation, since we don't know the actual path, and couldn't rename the file hence.*/
+ if (path_startswith(old_file->path, "/proc/self/fd"))
+ return -EINVAL;
+
if (!endswith(old_file->path, ".journal"))
return -EINVAL;
if (r < 0 && errno != ENOENT)
return -errno;
- old_file->header->state = STATE_ARCHIVED;
+ /* Set as archive so offlining commits w/state=STATE_ARCHIVED.
+ * Previously we would set old_file->header->state to STATE_ARCHIVED directly here,
+ * but journal_file_set_offline() short-circuits when state != STATE_ONLINE, which
+ * would result in the rotated journal never getting fsync() called before closing.
+ * Now we simply queue the archive state by setting an archive bit, leaving the state
+ * as STATE_ONLINE so proper offlining occurs. */
+ old_file->archive = true;
/* Currently, btrfs is not very good with out write patterns
* and fragments heavily. Let's defrag our journal files when
* we archive them */
old_file->defrag_on_close = true;
- r = journal_file_open(old_file->path, old_file->flags, old_file->mode, compress, seal, NULL, old_file->mmap, old_file, &new_file);
- journal_file_close(old_file);
+ r = journal_file_open(-1, old_file->path, old_file->flags, old_file->mode, compress, seal, NULL, old_file->mmap, deferred_closes, old_file, &new_file);
+
+ if (deferred_closes &&
+ set_put(deferred_closes, old_file) >= 0)
+ (void) journal_file_set_offline(old_file, false);
+ else
+ (void) journal_file_close(old_file);
*f = new_file;
return r;
bool seal,
JournalMetrics *metrics,
MMapCache *mmap_cache,
+ Set *deferred_closes,
JournalFile *template,
JournalFile **ret) {
size_t l;
_cleanup_free_ char *p = NULL;
- r = journal_file_open(fname, flags, mode, compress, seal, metrics, mmap_cache, template, ret);
+ r = journal_file_open(-1, fname, flags, mode, compress, seal, metrics, mmap_cache, deferred_closes, template, ret);
if (!IN_SET(r,
-EBADMSG, /* corrupted */
-ENODATA, /* truncated */
log_warning_errno(r, "File %s corrupted or uncleanly shut down, renaming and replacing.", fname);
- return journal_file_open(fname, flags, mode, compress, seal, metrics, mmap_cache, template, ret);
+ return journal_file_open(-1, fname, flags, mode, compress, seal, metrics, mmap_cache, deferred_closes, template, ret);
}
int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset) {
int journal_file_get_cutoff_realtime_usec(JournalFile *f, usec_t *from, usec_t *to) {
assert(f);
+ assert(f->header);
assert(from || to);
if (from) {
bool journal_file_rotate_suggested(JournalFile *f, usec_t max_file_usec) {
assert(f);
+ assert(f->header);
/* If we gained new header fields we gained new features,
* hence suggest a rotation */