]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
dlm: use rcu to avoid an extra rsb struct lookup
authorAlexander Aring <aahringo@redhat.com>
Mon, 10 Jun 2024 20:26:03 +0000 (15:26 -0500)
committerDavid Teigland <teigland@redhat.com>
Mon, 10 Jun 2024 20:26:03 +0000 (15:26 -0500)
Use rcu to free rsb structs, and hold the rcu read lock
while looking up rsb structs.  This allows us to avoid an
extra hash table lookup for an rsb.  A new rsb flag HASHED
is added which is set while the rsb is in the hash table.
This flag is checked in place of repeating the hash table
lookup.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/memory.c

index 8184843159061e0749ab2512ce97f7f2509b76ac..e06fa17c5603792a139cb67bdecd0b42715c2c63 100644 (file)
@@ -333,6 +333,7 @@ struct dlm_rsb {
        struct list_head        res_masters_list;   /* used for recovery */
        struct list_head        res_recover_list;   /* used for recovery */
        int                     res_recover_locks_count;
+       struct rcu_head         rcu;
 
        char                    *res_lvbptr;
        char                    res_name[DLM_RESNAME_MAXLEN+1];
@@ -366,6 +367,7 @@ enum rsb_flags {
        RSB_RECOVER_GRANT,
        RSB_RECOVER_LVB_INVAL,
        RSB_INACTIVE,
+       RSB_HASHED, /* set while rsb is on ls_rsbtbl */
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
index 5ca3f29bef7dcd85ab319f301814ee7305000b9a..8bee4f444afdc02485f9680585c0ebe6f0ec0cc1 100644 (file)
@@ -563,6 +563,7 @@ void dlm_rsb_scan(struct timer_list *timer)
                list_del(&r->res_slow_list);
                rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
                                       dlm_rhash_rsb_params);
+               rsb_clear_flag(r, RSB_HASHED);
 
                /* ls_rsbtbl_lock is not needed when calling send_remove() */
                write_unlock(&ls->ls_rsbtbl_lock);
@@ -636,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
 
 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
 {
-       return rhashtable_insert_fast(rhash, &rsb->res_node,
-                                     dlm_rhash_rsb_params);
+       int rv;
+
+       rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+                                   dlm_rhash_rsb_params);
+       if (!rv)
+               rsb_set_flag(rsb, RSB_HASHED);
+
+       return rv;
 }
 
 /*
@@ -752,11 +759,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
  do_inactive:
        write_lock_bh(&ls->ls_rsbtbl_lock);
 
-       /* retry lookup under write lock to see if its still in inactive state
-        * if not it's in active state and we relookup - unlikely path.
+       /*
+        * The expectation here is that the rsb will have HASHED and
+        * INACTIVE flags set, and that the rsb can be moved from
+        * inactive back to active again.  However, between releasing
+        * the read lock and acquiring the write lock, this rsb could
+        * have been removed from rsbtbl, and had HASHED cleared, to
+        * be freed.  To deal with this case, we would normally need
+        * to repeat dlm_search_rsb_tree while holding the write lock,
+        * but rcu allows us to simply check the HASHED flag, because
+        * the rcu read lock means the rsb will not be freed yet.
+        * If the HASHED flag is not set, then the rsb is being freed,
+        * so we add a new rsb struct.  If the HASHED flag is set,
+        * and INACTIVE is not set, it means another thread has
+        * made the rsb active, as we're expecting to do here, and
+        * we just repeat the lookup (this will be very unlikely.)
         */
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-       if (!error) {
+       if (rsb_flag(r, RSB_HASHED)) {
                if (!rsb_flag(r, RSB_INACTIVE)) {
                        write_unlock_bh(&ls->ls_rsbtbl_lock);
                        goto retry;
@@ -926,11 +945,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
  do_inactive:
        write_lock_bh(&ls->ls_rsbtbl_lock);
 
-       /* retry lookup under write lock to see if its still inactive.
-        * if it's active, repeat lookup - unlikely path.
-        */
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
-       if (!error) {
+       /* See comment in find_rsb_dir. */
+       if (rsb_flag(r, RSB_HASHED)) {
                if (!rsb_flag(r, RSB_INACTIVE)) {
                        write_unlock_bh(&ls->ls_rsbtbl_lock);
                        goto retry;
@@ -1012,12 +1028,54 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
        return error;
 }
 
+/*
+ * rsb rcu usage
+ *
+ * While rcu read lock is held, the rsb cannot be freed,
+ * which allows a lookup optimization.
+ *
+ * Two threads are accessing the same rsb concurrently,
+ * the first (A) is trying to use the rsb, the second (B)
+ * is trying to free the rsb.
+ *
+ * thread A                 thread B
+ * (trying to use rsb)      (trying to free rsb)
+ *
+ * A1. rcu read lock
+ * A2. rsbtbl read lock
+ * A3. look up rsb in rsbtbl
+ * A4. rsbtbl read unlock
+ *                          B1. rsbtbl write lock
+ *                          B2. look up rsb in rsbtbl
+ *                          B3. remove rsb from rsbtbl
+ *                          B4. clear rsb HASHED flag
+ *                          B5. rsbtbl write unlock
+ *                          B6. begin freeing rsb using rcu...
+ *
+ * (rsb is inactive, so try to make it active again)
+ * A5. read rsb HASHED flag (safe because rsb is not freed yet)
+ * A6. the rsb HASHED flag is not set, which it means the rsb
+ *     is being removed from rsbtbl and freed, so don't use it.
+ * A7. rcu read unlock
+ *
+ *                          B7. ...finish freeing rsb using rcu
+ * A8. create a new rsb
+ *
+ * Without the rcu optimization, steps A5-8 would need to do
+ * an extra rsbtbl lookup:
+ * A5. rsbtbl write lock
+ * A6. look up rsb in rsbtbl, not found
+ * A7. rsbtbl write unlock
+ * A8. create a new rsb
+ */
+
 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
                    int from_nodeid, unsigned int flags,
                    struct dlm_rsb **r_ret)
 {
        int dir_nodeid;
        uint32_t hash;
+       int rv;
 
        if (len > DLM_RESNAME_MAXLEN)
                return -EINVAL;
@@ -1025,12 +1083,15 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
        hash = jhash(name, len, 0);
        dir_nodeid = dlm_hash2nodeid(ls, hash);
 
+       rcu_read_lock();
        if (dlm_no_directory(ls))
-               return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
+               rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
                                      from_nodeid, flags, r_ret);
        else
-               return find_rsb_dir(ls, name, len, hash, dir_nodeid,
+               rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
                                    from_nodeid, flags, r_ret);
+       rcu_read_unlock();
+       return rv;
 }
 
 /* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1187,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
  */
 
-int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
-                     int len, unsigned int flags, int *r_nodeid, int *result)
+static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+                             int len, unsigned int flags, int *r_nodeid, int *result)
 {
        struct dlm_rsb *r = NULL;
        uint32_t hash;
@@ -1315,6 +1376,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
        return error;
 }
 
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+                     int len, unsigned int flags, int *r_nodeid, int *result)
+{
+       int rv;
+       rcu_read_lock();
+       rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
+       rcu_read_unlock();
+       return rv;
+}
+
 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 {
        struct dlm_rsb *r;
@@ -4293,6 +4364,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
        list_del(&r->res_slow_list);
        rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
                               dlm_rhash_rsb_params);
+       rsb_clear_flag(r, RSB_HASHED);
        write_unlock_bh(&ls->ls_rsbtbl_lock);
 
        free_inactive_rsb(r);
index 15a8b1cee43360a83d36a9df563fa4e75558d845..105a79978706bdb5848a1c3d4a5d753a0622b169 100644 (file)
@@ -101,13 +101,19 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
        return r;
 }
 
-void dlm_free_rsb(struct dlm_rsb *r)
+static void __free_rsb_rcu(struct rcu_head *rcu)
 {
+       struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu);
        if (r->res_lvbptr)
                dlm_free_lvb(r->res_lvbptr);
        kmem_cache_free(rsb_cache, r);
 }
 
+void dlm_free_rsb(struct dlm_rsb *r)
+{
+       call_rcu(&r->rcu, __free_rsb_rcu);
+}
+
 struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb;