]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
Merge tag 'dlm-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
authorLinus Torvalds <torvalds@linux-foundation.org>
Mon, 18 Mar 2024 22:39:48 +0000 (15:39 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Mon, 18 Mar 2024 22:39:48 +0000 (15:39 -0700)
Pull dlm updates from David Teigland:

 - Fix mistaken variable assignment that caused a refcounting problem

 - Revert a recent change that began using atomic counters where they
   were not needed (for lkb wait_count)

 - Add comments around forced state reset for waiting lock operations
   during recovery

* tag 'dlm-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: add comments about forced waiters reset
  dlm: revert atomic_t lkb_wait_count
  dlm: fix user space lkb refcounting

fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/user.c

index dfc444dad3298aeb361cc29533278496a81f85da..3b4dbce849f0f268d8b6876f28d90a32e0d9c119 100644 (file)
@@ -246,7 +246,7 @@ struct dlm_lkb {
        int8_t                  lkb_highbast;   /* highest mode bast sent for */
 
        int8_t                  lkb_wait_type;  /* type of reply waiting for */
-       atomic_t                lkb_wait_count;
+       int8_t                  lkb_wait_count;
        int                     lkb_wait_nodeid; /* for debugging */
 
        struct list_head        lkb_statequeue; /* rsb g/c/w list */
index 652c51fbbf7685e0777d61c6142c26350247396e..fd752dd03896348504200202199299b4389eb869 100644 (file)
@@ -1407,7 +1407,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        int error = 0;
-       int wc;
 
        mutex_lock(&ls->ls_waiters_mutex);
 
@@ -1429,17 +1428,20 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
                        error = -EBUSY;
                        goto out;
                }
-               wc = atomic_inc_return(&lkb->lkb_wait_count);
+               lkb->lkb_wait_count++;
                hold_lkb(lkb);
 
                log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
-                         lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
-                         dlm_iflags_val(lkb));
+                         lkb->lkb_id, lkb->lkb_wait_type, mstype,
+                         lkb->lkb_wait_count, dlm_iflags_val(lkb));
                goto out;
        }
 
-       wc = atomic_fetch_inc(&lkb->lkb_wait_count);
-       DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
+       DLM_ASSERT(!lkb->lkb_wait_count,
+                  dlm_print_lkb(lkb);
+                  printk("wait_count %d\n", lkb->lkb_wait_count););
+
+       lkb->lkb_wait_count++;
        lkb->lkb_wait_type = mstype;
        lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
        hold_lkb(lkb);
@@ -1502,7 +1504,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
                log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
                          lkb->lkb_id);
                lkb->lkb_wait_type = 0;
-               atomic_dec(&lkb->lkb_wait_count);
+               lkb->lkb_wait_count--;
                unhold_lkb(lkb);
                goto out_del;
        }
@@ -1529,15 +1531,16 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
        if (overlap_done && lkb->lkb_wait_type) {
                log_error(ls, "remwait error %x reply %d wait_type %d overlap",
                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
-               atomic_dec(&lkb->lkb_wait_count);
+               lkb->lkb_wait_count--;
                unhold_lkb(lkb);
                lkb->lkb_wait_type = 0;
        }
 
-       DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
+       DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
 
        clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
-       if (atomic_dec_and_test(&lkb->lkb_wait_count))
+       lkb->lkb_wait_count--;
+       if (!lkb->lkb_wait_count)
                list_del_init(&lkb->lkb_wait_reply);
        unhold_lkb(lkb);
        return 0;
@@ -2666,7 +2669,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
                        goto out;
 
                /* lock not allowed if there's any op in progress */
-               if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
+               if (lkb->lkb_wait_type || lkb->lkb_wait_count)
                        goto out;
 
                if (is_overlap(lkb))
@@ -2728,7 +2731,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
 
        /* normal unlock not allowed if there's any op in progress */
        if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
-           (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
+           (lkb->lkb_wait_type || lkb->lkb_wait_count))
                goto out;
 
        /* an lkb may be waiting for an rsb lookup to complete where the
@@ -5011,21 +5014,32 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
        return lkb;
 }
 
-/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
-   master or dir-node for r.  Processing the lkb may result in it being placed
-   back on waiters. */
-
-/* We do this after normal locking has been enabled and any saved messages
-   (in requestqueue) have been processed.  We should be confident that at
-   this point we won't get or process a reply to any of these waiting
-   operations.  But, new ops may be coming in on the rsbs/locks here from
-   userspace or remotely. */
-
-/* there may have been an overlap unlock/cancel prior to recovery or after
-   recovery.  if before, the lkb may still have a pos wait_count; if after, the
-   overlap flag would just have been set and nothing new sent.  we can be
-   confident here than any replies to either the initial op or overlap ops
-   prior to recovery have been received. */
+/*
+ * Forced state reset for locks that were in the middle of remote operations
+ * when recovery happened (i.e. lkbs that were on the waiters list, waiting
+ * for a reply from a remote operation.)  The lkbs remaining on the waiters
+ * list need to be reevaluated; some may need resending to a different node
+ * than previously, and some may now need local handling rather than remote.
+ *
+ * First, the lkb state for the voided remote operation is forcibly reset,
+ * equivalent to what remove_from_waiters() would normally do:
+ * . lkb removed from ls_waiters list
+ * . lkb wait_type cleared
+ * . lkb waiters_count cleared
+ * . lkb ref count decremented for each waiters_count (almost always 1,
+ *   but possibly 2 in case of cancel/unlock overlapping, which means
+ *   two remote replies were being expected for the lkb.)
+ *
+ * Second, the lkb is reprocessed like an original operation would be,
+ * by passing it to _request_lock or _convert_lock, which will either
+ * process the lkb operation locally, or send it to a remote node again
+ * and put the lkb back onto the waiters list.
+ *
+ * When reprocessing the lkb, we may find that it's flagged for an overlapping
+ * force-unlock or cancel, either from before recovery began, or after recovery
+ * finished.  If this is the case, the unlock/cancel is done directly, and the
+ * original operation is not initiated again (no _request_lock/_convert_lock.)
+ */
 
 int dlm_recover_waiters_post(struct dlm_ls *ls)
 {
@@ -5040,6 +5054,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                        break;
                }
 
+               /* 
+                * Find an lkb from the waiters list that's been affected by
+                * recovery node changes, and needs to be reprocessed.  Does
+                * hold_lkb(), adding a refcount.
+                */
                lkb = find_resend_waiter(ls);
                if (!lkb)
                        break;
@@ -5048,6 +5067,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                hold_rsb(r);
                lock_rsb(r);
 
+               /*
+                * If the lkb has been flagged for a force unlock or cancel,
+                * then the reprocessing below will be replaced by just doing
+                * the unlock/cancel directly.
+                */
                mstype = lkb->lkb_wait_type;
                oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
                                        &lkb->lkb_iflags);
@@ -5061,22 +5085,40 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                          r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
                          dlm_dir_nodeid(r), oc, ou);
 
-               /* At this point we assume that we won't get a reply to any
-                  previous op or overlap op on this lock.  First, do a big
-                  remove_from_waiters() for all previous ops. */
+               /*
+                * No reply to the pre-recovery operation will now be received,
+                * so a forced equivalent of remove_from_waiters() is needed to
+                * reset the waiters state that was in place before recovery.
+                */
 
                clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
+
+               /* Forcibly clear wait_type */
                lkb->lkb_wait_type = 0;
-               /* drop all wait_count references we still
-                * hold a reference for this iteration.
+
+               /*
+                * Forcibly reset wait_count and associated refcount.  The
+                * wait_count will almost always be 1, but in case of an
+                * overlapping unlock/cancel it could be 2: see where
+                * add_to_waiters() finds the lkb is already on the waiters
+                * list and does lkb_wait_count++; hold_lkb().
                 */
-               while (!atomic_dec_and_test(&lkb->lkb_wait_count))
+               while (lkb->lkb_wait_count) {
+                       lkb->lkb_wait_count--;
                        unhold_lkb(lkb);
+               }
 
+               /* Forcibly remove from waiters list */
                mutex_lock(&ls->ls_waiters_mutex);
                list_del_init(&lkb->lkb_wait_reply);
                mutex_unlock(&ls->ls_waiters_mutex);
 
+               /*
+                * The lkb is now clear of all prior waiters state and can be
+                * processed locally, or sent to remote node again, or directly
+                * cancelled/unlocked.
+                */
+
                if (oc || ou) {
                        /* do an unlock or cancel instead of resending */
                        switch (mstype) {
index 695e691b38b318395f241de0a2f28c5e48e30a4f..9f9b68448830ece12fd342e79ef1f2ccf6f9f1ae 100644 (file)
@@ -806,7 +806,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
        struct dlm_lkb *lkb;
        DECLARE_WAITQUEUE(wait, current);
        struct dlm_callback *cb;
-       int rv, copy_lvb = 0;
+       int rv, ret, copy_lvb = 0;
        int old_mode, new_mode;
 
        if (count == sizeof(struct dlm_device_version)) {
@@ -906,9 +906,9 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
                trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
        }
 
-       rv = copy_result_to_user(lkb->lkb_ua,
-                                test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
-                                cb->flags, cb->mode, copy_lvb, buf, count);
+       ret = copy_result_to_user(lkb->lkb_ua,
+                                 test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
+                                 cb->flags, cb->mode, copy_lvb, buf, count);
 
        kref_put(&cb->ref, dlm_release_callback);
 
@@ -916,7 +916,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
        if (rv == DLM_DEQUEUE_CALLBACK_LAST)
                dlm_put_lkb(lkb);
 
-       return rv;
+       return ret;
 }
 
 static __poll_t device_poll(struct file *file, poll_table *wait)