]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
dlm: add new struct to save position in dlm_copy_master_names
authorAlexander Aring <aahringo@redhat.com>
Tue, 2 Apr 2024 19:18:02 +0000 (15:18 -0400)
committerDavid Teigland <teigland@redhat.com>
Tue, 9 Apr 2024 16:44:49 +0000 (11:44 -0500)
Add a new struct to save the current position in the rsb masters_list
while sending the rsb names to other nodes. The rsb names are sent in
multiple chunks, and for each new chunk, the new "dlm_dir_dump" struct
saves the last position in the masters_list. The new struct is also
used to save more information to sanity check the recovery process.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/dir.c
fs/dlm/dlm_internal.h
fs/dlm/lockspace.c
fs/dlm/recoverd.c

index 3da00c46cbb3bf669a42165fe05ca1d53fd348bf..0dc8a1d9e41145b6e5b45876ab723a4884d4217e 100644 (file)
@@ -224,6 +224,80 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
        return NULL;
 }
 
+struct dlm_dir_dump {
+       /* init values to match if whole
+        * dump fits to one seq. Sanity check only.
+        */
+       uint64_t seq_init;
+       uint64_t nodeid_init;
+       /* compare local pointer with last lookup,
+        * just a sanity check.
+        */
+       struct list_head *last;
+
+       unsigned int sent_res; /* for log info */
+       unsigned int sent_msg; /* for log info */
+
+       struct list_head list;
+};
+
+static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
+{
+       struct dlm_dir_dump *dd, *safe;
+
+       write_lock(&ls->ls_dir_dump_lock);
+       list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
+               if (dd->nodeid_init == nodeid) {
+                       log_error(ls, "drop dump seq %llu",
+                                (unsigned long long)dd->seq_init);
+                       list_del(&dd->list);
+                       kfree(dd);
+               }
+       }
+       write_unlock(&ls->ls_dir_dump_lock);
+}
+
+static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+       struct dlm_dir_dump *iter, *dd = NULL;
+
+       read_lock(&ls->ls_dir_dump_lock);
+       list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
+               if (iter->nodeid_init == nodeid) {
+                       dd = iter;
+                       break;
+               }
+       }
+       read_unlock(&ls->ls_dir_dump_lock);
+
+       return dd;
+}
+
+static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
+{
+       struct dlm_dir_dump *dd;
+
+       dd = lookup_dir_dump(ls, nodeid);
+       if (dd) {
+               log_error(ls, "found ongoing dir dump for node %d, will drop it",
+                         nodeid);
+               drop_dir_ctx(ls, nodeid);
+       }
+
+       dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
+       if (!dd)
+               return NULL;
+
+       dd->seq_init = ls->ls_recover_seq;
+       dd->nodeid_init = nodeid;
+
+       write_lock(&ls->ls_dir_dump_lock);
+       list_add(&dd->list, &ls->ls_dir_dump_list);
+       write_unlock(&ls->ls_dir_dump_lock);
+
+       return dd;
+}
+
 /* Find the rsb where we left off (or start again), then send rsb names
    for rsb's we're master of and whose directory node matches the requesting
    node.  inbuf is the rsb name last sent, inlen is the name's length */
@@ -234,11 +308,20 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
        struct list_head *list;
        struct dlm_rsb *r;
        int offset = 0, dir_nodeid;
+       struct dlm_dir_dump *dd;
        __be16 be_namelen;
 
        read_lock(&ls->ls_masters_lock);
 
        if (inlen > 1) {
+               dd = lookup_dir_dump(ls, nodeid);
+               if (!dd) {
+                       log_error(ls, "failed to lookup dir dump context nodeid: %d",
+                                 nodeid);
+                       goto out;
+               }
+
+               /* next chunk in dump */
                r = find_rsb_root(ls, inbuf, inlen);
                if (!r) {
                        log_error(ls, "copy_master_names from %d start %d %.*s",
@@ -246,8 +329,25 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
                        goto out;
                }
                list = r->res_masters_list.next;
+
+               /* sanity checks */
+               if (dd->last != &r->res_masters_list ||
+                   dd->seq_init != ls->ls_recover_seq) {
+                       log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
+                                 (unsigned long long)dd->seq_init,
+                                 (unsigned long long)ls->ls_recover_seq);
+                       goto out;
+               }
        } else {
+               dd = init_dir_dump(ls, nodeid);
+               if (!dd) {
+                       log_error(ls, "failed to allocate dir dump context");
+                       goto out;
+               }
+
+               /* start dump */
                list = ls->ls_masters_list.next;
+               dd->last = list;
        }
 
        for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
@@ -269,7 +369,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
                        be_namelen = cpu_to_be16(0);
                        memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
                        offset += sizeof(__be16);
-                       ls->ls_recover_dir_sent_msg++;
+                       dd->sent_msg++;
                        goto out;
                }
 
@@ -278,7 +378,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
                offset += sizeof(__be16);
                memcpy(outbuf + offset, r->res_name, r->res_length);
                offset += r->res_length;
-               ls->ls_recover_dir_sent_res++;
+               dd->sent_res++;
+               dd->last = list;
        }
 
        /*
@@ -288,10 +389,18 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 
        if ((list == &ls->ls_masters_list) &&
            (offset + sizeof(uint16_t) <= outlen)) {
+               /* end dump */
                be_namelen = cpu_to_be16(0xFFFF);
                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
                offset += sizeof(__be16);
-               ls->ls_recover_dir_sent_msg++;
+               dd->sent_msg++;
+               log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
+                         nodeid, dd->sent_res, dd->sent_msg);
+
+               write_lock(&ls->ls_dir_dump_lock);
+               list_del_init(&dd->list);
+               write_unlock(&ls->ls_dir_dump_lock);
+               kfree(dd);
        }
  out:
        read_unlock(&ls->ls_masters_lock);
index f434325d5bc87b33942020651eeca216a073c9c4..e03a379832d59e0ad12f53cdc333ab27799f063e 100644 (file)
@@ -660,8 +660,6 @@ struct dlm_ls {
        struct mutex            ls_requestqueue_mutex;
        struct dlm_rcom         *ls_recover_buf;
        int                     ls_recover_nodeid; /* for debugging */
-       unsigned int            ls_recover_dir_sent_res; /* for log info */
-       unsigned int            ls_recover_dir_sent_msg; /* for log info */
        unsigned int            ls_recover_locks_in; /* for log info */
        uint64_t                ls_rcom_seq;
        spinlock_t              ls_rcom_spin;
@@ -676,6 +674,8 @@ struct dlm_ls {
 
        struct list_head        ls_masters_list; /* root resources */
        rwlock_t                ls_masters_lock; /* protect root_list */
+       struct list_head        ls_dir_dump_list; /* root resources */
+       rwlock_t                ls_dir_dump_lock; /* protect root_list */
 
        const struct dlm_lockspace_ops *ls_ops;
        void                    *ls_ops_arg;
index da756e5c0f6c4c15fd7b09f27c81f60a568cc68e..af7769f8e38c5b64d9686bef98e8f507daf6e041 100644 (file)
@@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster,
        init_waitqueue_head(&ls->ls_wait_general);
        INIT_LIST_HEAD(&ls->ls_masters_list);
        rwlock_init(&ls->ls_masters_lock);
+       INIT_LIST_HEAD(&ls->ls_dir_dump_list);
+       rwlock_init(&ls->ls_dir_dump_lock);
 
        spin_lock(&lslist_lock);
        ls->ls_create_count = 1;
index f6acc735162524e5e9a524514abde52fa82a0d1b..0b1a62167798cbc21ac0219f82ab9bf251c93a03 100644 (file)
@@ -173,8 +173,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                goto fail_root_list;
        }
 
-       ls->ls_recover_dir_sent_res = 0;
-       ls->ls_recover_dir_sent_msg = 0;
        ls->ls_recover_locks_in = 0;
 
        dlm_set_recover_status(ls, DLM_RS_NODES);
@@ -211,9 +209,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        dlm_release_masters_list(ls);
 
-       log_rinfo(ls, "dlm_recover_directory %u out %u messages",
-                 ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
-
        /*
         * We may have outstanding operations that are waiting for a reply from
         * a failed node.  Mark these to be resent after recovery.  Unlock and