]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
journal: asynchronous journal_file_set_offline()
authorVito Caputo <vito.caputo@coreos.com>
Fri, 12 Feb 2016 12:59:57 +0000 (04:59 -0800)
committerVito Caputo <vito.caputo@coreos.com>
Sat, 20 Feb 2016 02:50:20 +0000 (18:50 -0800)
This adds a wait flag to journal_file_set_offline(), when false the offline is
performed asynchronously in a separate thread.

When wait is true, if an asynchronous offline is already in-progress it is
restarted and waited for.  Otherwise the offline is performed synchronously
without the use of a thread.

journal_file_set_online() cancels or waits for the asynchronous offline to
complete if in-flight, depending on where in the offline process the thread
happens to be.  If the thread is in the fsync() phase, it is cancelled and
waiting is unnecessary.  Otherwise, the thread is joined before proceeding.

A new offline_state member is added to JournalFile which is used via
atomic operations for communicating between the offline thread and the
journal_file_set_{offline,online}() functions.

src/journal/journal-file.c
src/journal/journal-file.h
src/journal/journald-server.c

index 52110aa498174b8b106a1f91693aaf077488ee94..96be339d5ba9d727e7ce4fb29fcc8f964afd0ec1 100644 (file)
@@ -20,6 +20,7 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <linux/fs.h>
+#include <pthread.h>
 #include <stddef.h>
 #include <sys/mman.h>
 #include <sys/statvfs.h>
 /* The mmap context to use for the header we pick as one above the last defined typed */
 #define CONTEXT_HEADER _OBJECT_TYPE_MAX
 
-static int journal_file_set_online(JournalFile *f) {
+/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
+ * As a result we use atomic operations on f->offline_state for inter-thread communications with
+ * journal_file_set_offline() and journal_file_set_online(). */
+static void journal_file_set_offline_internal(JournalFile *f) {
         assert(f);
+        assert(f->fd >= 0);
+        assert(f->header);
 
-        if (!f->writable)
-                return -EPERM;
+        for (;;) {
+                switch (f->offline_state) {
+                case OFFLINE_CANCEL:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
+                                continue;
+                        return;
 
-        if (!(f->fd >= 0 && f->header))
-                return -EINVAL;
+                case OFFLINE_AGAIN_FROM_SYNCING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
+                                continue;
+                        break;
+
+                case OFFLINE_AGAIN_FROM_OFFLINING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
+                                continue;
+                        break;
+
+                case OFFLINE_SYNCING:
+                        (void) fsync(f->fd);
+
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
+                                continue;
+
+                        f->header->state = STATE_OFFLINE;
+                        (void) fsync(f->fd);
+                        break;
+
+                case OFFLINE_OFFLINING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
+                                continue;
+                        /* fall through */
+
+                case OFFLINE_DONE:
+                        return;
+
+                case OFFLINE_JOINED:
+                        log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
+                        return;
+                }
+        }
+}
+
+static void * journal_file_set_offline_thread(void *arg) {
+        JournalFile *f = arg;
+
+        journal_file_set_offline_internal(f);
+
+        return NULL;
+}
+
+static int journal_file_set_offline_thread_join(JournalFile *f) {
+        int r;
+
+        assert(f);
+
+        if (f->offline_state == OFFLINE_JOINED)
+                return 0;
+
+        r = pthread_join(f->offline_thread, NULL);
+        if (r)
+                return -r;
+
+        f->offline_state = OFFLINE_JOINED;
 
         if (mmap_cache_got_sigbus(f->mmap, f->fd))
                 return -EIO;
 
-        switch (f->header->state) {
-                case STATE_ONLINE:
-                        return 0;
+        return 0;
+}
 
-                case STATE_OFFLINE:
-                        f->header->state = STATE_ONLINE;
-                        (void) fsync(f->fd);
-                        return 0;
+/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
+static bool journal_file_set_offline_try_restart(JournalFile *f) {
+        for (;;) {
+                switch (f->offline_state) {
+                case OFFLINE_AGAIN_FROM_SYNCING:
+                case OFFLINE_AGAIN_FROM_OFFLINING:
+                        return true;
+
+                case OFFLINE_CANCEL:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
+                                continue;
+                        return true;
+
+                case OFFLINE_SYNCING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
+                                continue;
+                        return true;
+
+                case OFFLINE_OFFLINING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
+                                continue;
+                        return true;
 
                 default:
-                        return -EINVAL;
+                        return false;
+                }
         }
 }
 
-int journal_file_set_offline(JournalFile *f) {
+/* Sets a journal offline.
+ *
+ * If wait is false then an offline is dispatched in a separate thread for a
+ * subsequent journal_file_set_offline() or journal_file_set_online() of the
+ * same journal to synchronize with.
+ *
+ * If wait is true, then either an existing offline thread will be restarted
+ * and joined, or if none exists the offline is simply performed in this
+ * context without involving another thread.
+ */
+int journal_file_set_offline(JournalFile *f, bool wait) {
+        bool restarted;
+        int r;
+
         assert(f);
 
         if (!f->writable)
@@ -124,19 +219,95 @@ int journal_file_set_offline(JournalFile *f) {
         if (f->header->state != STATE_ONLINE)
                 return 0;
 
-        (void) fsync(f->fd);
+        /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
+        restarted = journal_file_set_offline_try_restart(f);
+        if ((restarted && wait) || !restarted) {
+                r = journal_file_set_offline_thread_join(f);
+                if (r < 0)
+                        return r;
+        }
 
-        if (mmap_cache_got_sigbus(f->mmap, f->fd))
-                return -EIO;
+        if (restarted)
+                return 0;
+
+        /* Initiate a new offline. */
+        f->offline_state = OFFLINE_SYNCING;
+
+        if (wait) /* Without using a thread if waiting. */
+                journal_file_set_offline_internal(f);
+        else {
+                r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
+                if (r > 0)
+                        return -r;
+        }
+
+        return 0;
+}
+
+static int journal_file_set_online(JournalFile *f) {
+        bool joined = false;
+
+        assert(f);
+
+        if (!f->writable)
+                return -EPERM;
+
+        if (!(f->fd >= 0 && f->header))
+                return -EINVAL;
 
-        f->header->state = STATE_OFFLINE;
+        while (!joined) {
+                switch (f->offline_state) {
+                case OFFLINE_JOINED:
+                        /* No offline thread, no need to wait. */
+                        joined = true;
+                        break;
+
+                case OFFLINE_SYNCING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_CANCEL))
+                                continue;
+                        /* Canceled syncing prior to offlining, no need to wait. */
+                        break;
+
+                case OFFLINE_AGAIN_FROM_SYNCING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_CANCEL))
+                                continue;
+                        /* Canceled restart from syncing, no need to wait. */
+                        break;
+
+                case OFFLINE_AGAIN_FROM_OFFLINING:
+                        if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_CANCEL))
+                                continue;
+                        /* Canceled restart from offlining, must wait for offlining to complete however. */
+
+                        /* fall through to wait */
+                default: {
+                        int r;
+
+                        r = journal_file_set_offline_thread_join(f);
+                        if (r < 0)
+                                return r;
+
+                        joined = true;
+                        break;
+                }
+                }
+        }
 
         if (mmap_cache_got_sigbus(f->mmap, f->fd))
                 return -EIO;
 
-        (void) fsync(f->fd);
+        switch (f->header->state) {
+                case STATE_ONLINE:
+                        return 0;
 
-        return 0;
+                case STATE_OFFLINE:
+                        f->header->state = STATE_ONLINE;
+                        (void) fsync(f->fd);
+                        return 0;
+
+                default:
+                        return -EINVAL;
+        }
 }
 
 JournalFile* journal_file_close(JournalFile *f) {
@@ -159,7 +330,7 @@ JournalFile* journal_file_close(JournalFile *f) {
                 sd_event_source_unref(f->post_change_timer);
         }
 
-        journal_file_set_offline(f);
+        journal_file_set_offline(f, true);
 
         if (f->mmap && f->fd >= 0)
                 mmap_cache_close_fd(f->mmap, f->fd);
index 07b9561b8ace8f1bce78d1c5e3dd8b75e80ed6a9..fad4f78bdcb2ee15d59ff076ba525d37fae9ae20 100644 (file)
@@ -63,6 +63,16 @@ typedef enum LocationType {
         LOCATION_SEEK
 } LocationType;
 
+typedef enum OfflineState {
+        OFFLINE_JOINED,
+        OFFLINE_SYNCING,
+        OFFLINE_OFFLINING,
+        OFFLINE_CANCEL,
+        OFFLINE_AGAIN_FROM_SYNCING,
+        OFFLINE_AGAIN_FROM_OFFLINING,
+        OFFLINE_DONE
+} OfflineState;
+
 typedef struct JournalFile {
         int fd;
 
@@ -105,6 +115,9 @@ typedef struct JournalFile {
 
         OrderedHashmap *chain_cache;
 
+        pthread_t offline_thread;
+        volatile OfflineState offline_state;
+
 #if defined(HAVE_XZ) || defined(HAVE_LZ4)
         void *compress_buffer;
         size_t compress_buffer_size;
@@ -139,7 +152,7 @@ int journal_file_open(
                 JournalFile *template,
                 JournalFile **ret);
 
-int journal_file_set_offline(JournalFile *f);
+int journal_file_set_offline(JournalFile *f, bool wait);
 JournalFile* journal_file_close(JournalFile *j);
 
 int journal_file_open_reliably(
index 5e120fdac08cec659ed6287051a5f6b35f31c22f..ac992a8b54bc371d0319ae6a006d01ab2a00d5b4 100644 (file)
@@ -372,13 +372,13 @@ void server_sync(Server *s) {
         int r;
 
         if (s->system_journal) {
-                r = journal_file_set_offline(s->system_journal);
+                r = journal_file_set_offline(s->system_journal, false);
                 if (r < 0)
                         log_warning_errno(r, "Failed to sync system journal, ignoring: %m");
         }
 
         ORDERED_HASHMAP_FOREACH(f, s->user_journals, i) {
-                r = journal_file_set_offline(f);
+                r = journal_file_set_offline(f, false);
                 if (r < 0)
                         log_warning_errno(r, "Failed to sync user journal, ignoring: %m");
         }