]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
dlm: use a new list for recovery of master rsb names
authorAlexander Aring <aahringo@redhat.com>
Tue, 2 Apr 2024 19:18:00 +0000 (15:18 -0400)
committerDavid Teigland <teigland@redhat.com>
Tue, 9 Apr 2024 16:44:49 +0000 (11:44 -0500)
Add a new "masters_list" for master rsb structs, with a new
rwlock. The new list is created and used during the recovery
process to send the master rsb names to new nodes. With this
change, the current "root_list" can be used without locking.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/dir.c
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lockspace.c
fs/dlm/recoverd.c

index f6acba4310a7b90aadb5f8f18c6ab560c6ed2cd6..10753486049a6fa7a426cf613eb389ff306e2ed6 100644 (file)
@@ -216,16 +216,13 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
        if (!rv)
                return r;
 
-       down_read(&ls->ls_root_sem);
-       list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+       list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
                if (len == r->res_length && !memcmp(name, r->res_name, len)) {
-                       up_read(&ls->ls_root_sem);
                        log_debug(ls, "find_rsb_root revert to root_list %s",
                                  r->res_name);
                        return r;
                }
        }
-       up_read(&ls->ls_root_sem);
        return NULL;
 }
 
@@ -241,7 +238,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
        int offset = 0, dir_nodeid;
        __be16 be_namelen;
 
-       down_read(&ls->ls_root_sem);
+       read_lock(&ls->ls_masters_lock);
 
        if (inlen > 1) {
                r = find_rsb_root(ls, inbuf, inlen);
@@ -250,16 +247,13 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
                                  nodeid, inlen, inlen, inbuf);
                        goto out;
                }
-               list = r->res_root_list.next;
+               list = r->res_masters_list.next;
        } else {
-               list = ls->ls_root_list.next;
+               list = ls->ls_masters_list.next;
        }
 
-       for (offset = 0; list != &ls->ls_root_list; list = list->next) {
-               r = list_entry(list, struct dlm_rsb, res_root_list);
-               if (r->res_nodeid)
-                       continue;
-
+       for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
+               r = list_entry(list, struct dlm_rsb, res_masters_list);
                dir_nodeid = dlm_dir_nodeid(r);
                if (dir_nodeid != nodeid)
                        continue;
@@ -294,7 +288,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
         * terminating record.
         */
 
-       if ((list == &ls->ls_root_list) &&
+       if ((list == &ls->ls_masters_list) &&
            (offset + sizeof(uint16_t) <= outlen)) {
                be_namelen = cpu_to_be16(0xFFFF);
                memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
@@ -302,6 +296,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
                ls->ls_recover_dir_sent_msg++;
        }
  out:
-       up_read(&ls->ls_root_sem);
+       read_unlock(&ls->ls_masters_lock);
 }
 
index 1d2ee5c2d23dacbb70a3aa7f964d04d76fea984b..3524f2b33f2cce07e4445f9e590346573f787c11 100644 (file)
@@ -342,6 +342,7 @@ struct dlm_rsb {
        struct list_head        res_waitqueue;
 
        struct list_head        res_root_list;      /* used for recovery */
+       struct list_head        res_masters_list;   /* used for recovery */
        struct list_head        res_recover_list;   /* used for recovery */
        int                     res_recover_locks_count;
 
@@ -675,6 +676,8 @@ struct dlm_ls {
 
        struct list_head        ls_root_list;   /* root resources */
        struct rw_semaphore     ls_root_sem;    /* protect root_list */
+       struct list_head        ls_masters_list;        /* root resources */
+       rwlock_t                ls_masters_lock;        /* protect root_list */
 
        const struct dlm_lockspace_ops *ls_ops;
        void                    *ls_ops_arg;
index d87464614bc5eb494fd470c4ae8466504c4c0900..e0ab7432ca4d2c1781b41441dc4fe424fd2d7376 100644 (file)
@@ -423,6 +423,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
        INIT_LIST_HEAD(&r->res_waitqueue);
        INIT_LIST_HEAD(&r->res_root_list);
        INIT_LIST_HEAD(&r->res_recover_list);
+       INIT_LIST_HEAD(&r->res_masters_list);
 
        *r_ret = r;
        return 0;
@@ -1168,6 +1169,7 @@ static void kill_rsb(struct kref *kref)
        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
+       DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
 }
 
 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
index 0455dddb0797c95c4276b533a810a525fabb144d..c427c76b5f0744bb26fbc302bd5f2f2ab53aae13 100644 (file)
@@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster,
        init_waitqueue_head(&ls->ls_wait_general);
        INIT_LIST_HEAD(&ls->ls_root_list);
        init_rwsem(&ls->ls_root_sem);
+       INIT_LIST_HEAD(&ls->ls_masters_list);
+       rwlock_init(&ls->ls_masters_lock);
 
        spin_lock(&lslist_lock);
        ls->ls_create_count = 1;
index 8eb42554ccb0ae5d5a8f43f6b71ef8e2e80f6468..dfce8fc6a7834d6f45ff6a78cdd801cef39e7a9e 100644 (file)
 #include "requestqueue.h"
 #include "recoverd.h"
 
+static int dlm_create_masters_list(struct dlm_ls *ls)
+{
+       struct rb_node *n;
+       struct dlm_rsb *r;
+       int i, error = 0;
+
+       write_lock(&ls->ls_masters_lock);
+       if (!list_empty(&ls->ls_masters_list)) {
+               log_error(ls, "root list not empty");
+               error = -EINVAL;
+               goto out;
+       }
+
+       for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+               spin_lock_bh(&ls->ls_rsbtbl[i].lock);
+               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (r->res_nodeid)
+                               continue;
+
+                       list_add(&r->res_masters_list, &ls->ls_masters_list);
+                       dlm_hold_rsb(r);
+               }
+               spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
+       }
+ out:
+       write_unlock(&ls->ls_masters_lock);
+       return error;
+}
+
+static void dlm_release_masters_list(struct dlm_ls *ls)
+{
+       struct dlm_rsb *r, *safe;
+
+       write_lock(&ls->ls_masters_lock);
+       list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
+               list_del_init(&r->res_masters_list);
+               dlm_put_rsb(r);
+       }
+       write_unlock(&ls->ls_masters_lock);
+}
+
 static void dlm_create_root_list(struct dlm_ls *ls)
 {
        struct rb_node *n;
@@ -123,6 +165,23 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        dlm_recover_dir_nodeid(ls);
 
+       /* Create a snapshot of all active rsbs were we are the master of.
+        * During the barrier between dlm_recover_members_wait() and
+        * dlm_recover_directory() other nodes can dump their necessary
+        * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
+        * communication dlm_copy_master_names() handling.
+        *
+        * TODO We should create a per lockspace list that contains rsbs
+        * that we are the master of. Instead of creating this list while
+        * recovery we keep track of those rsbs while locking handling and
+        * recovery can use it when necessary.
+        */
+       error = dlm_create_masters_list(ls);
+       if (error) {
+               log_rinfo(ls, "dlm_create_masters_list error %d", error);
+               goto fail;
+       }
+
        ls->ls_recover_dir_sent_res = 0;
        ls->ls_recover_dir_sent_msg = 0;
        ls->ls_recover_locks_in = 0;
@@ -132,6 +191,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
        error = dlm_recover_members_wait(ls, rv->seq);
        if (error) {
                log_rinfo(ls, "dlm_recover_members_wait error %d", error);
+               dlm_release_masters_list(ls);
                goto fail;
        }
 
@@ -145,6 +205,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
        error = dlm_recover_directory(ls, rv->seq);
        if (error) {
                log_rinfo(ls, "dlm_recover_directory error %d", error);
+               dlm_release_masters_list(ls);
                goto fail;
        }
 
@@ -153,9 +214,12 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
        error = dlm_recover_directory_wait(ls, rv->seq);
        if (error) {
                log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
+               dlm_release_masters_list(ls);
                goto fail;
        }
 
+       dlm_release_masters_list(ls);
+
        log_rinfo(ls, "dlm_recover_directory %u out %u messages",
                  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);