]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
dlm: fix race between final callback and remove
authorAlexander Aring <aahringo@redhat.com>
Thu, 28 Mar 2024 15:48:41 +0000 (11:48 -0400)
committerDavid Teigland <teigland@redhat.com>
Mon, 1 Apr 2024 18:31:12 +0000 (13:31 -0500)
This patch fixes the following issue:

node 1 is dir
node 2 is master
node 3 is other

1->2: unlock
2: put final lkb, rsb moved to toss
2->1: unlock_reply
1: queue lkb callback with EUNLOCK
2->1: remove
1: receive_remove ignored (rsb on keep because of queued lkb callback)
1: complete lkb callback, put_lkb, move rsb to toss
3->1: lookup
1->3: lookup_reply master=2
3->2: request
2->3: request_reply EBADR

In summary:
An unexpected lkb reference causes the rsb to remain on the wrong list.
The rsb being on the wrong list causes receive_remove to be ignored.
An ignored receive_remove causes inconsistent dir and master state.

This sequence requires an unusually long delay in delivering the unlock
callback, because the remove message from 2->1 usually happens after
some seconds.  So, it's not known exactly how frequently this sequence
occurs in pratice.  It's possible that the same end result could also
have another unknown cause.

The solution for this issue is to further separate callback state
from the lkb, so that an lkb reference (and from that, an rsb ref)
are not held while a callback remains queued.  Then, within the
unlock_reply, the lkb will be freed and the rsb moved to the toss
list. So, the receive_remove will not be ignored.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/ast.c
fs/dlm/ast.h
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/user.c

index 5ea0b62f276b940e307440a5ccfb4af06980ecf8..e9da812352b4c89e28b682d27750b63fb79451b9 100644 (file)
@@ -37,12 +37,32 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
        *from = to;
 }
 
-int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
-                            int status, uint32_t sbflags)
+static void dlm_callback_work(struct work_struct *work)
 {
-       struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+       struct dlm_callback *cb = container_of(work, struct dlm_callback, work);
+
+       if (cb->flags & DLM_CB_BAST) {
+               trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
+                              cb->res_length);
+               cb->bastfn(cb->astparam, cb->mode);
+       } else if (cb->flags & DLM_CB_CAST) {
+               trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
+                             cb->sb_flags, cb->res_name, cb->res_length);
+               cb->lkb_lksb->sb_status = cb->sb_status;
+               cb->lkb_lksb->sb_flags = cb->sb_flags;
+               cb->astfn(cb->astparam);
+       }
+
+       kref_put(&cb->ref, dlm_release_callback);
+}
+
+int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+                          int status, uint32_t sbflags,
+                          struct dlm_callback **cb)
+{
+       struct dlm_rsb *rsb = lkb->lkb_resource;
        int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
-       struct dlm_callback *cb;
+       struct dlm_ls *ls = rsb->res_ls;
        int copy_lvb = 0;
        int prev_mode;
 
@@ -88,57 +108,46 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                }
        }
 
-       cb = dlm_allocate_cb();
-       if (!cb) {
+       *cb = dlm_allocate_cb();
+       if (!*cb) {
                rv = DLM_ENQUEUE_CALLBACK_FAILURE;
                goto out;
        }
 
-       cb->flags = flags;
-       cb->mode = mode;
-       cb->sb_status = status;
-       cb->sb_flags = (sbflags & 0x000000FF);
-       cb->copy_lvb = copy_lvb;
-       kref_init(&cb->ref);
-       if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
-               rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
+       /* for tracing */
+       (*cb)->lkb_id = lkb->lkb_id;
+       (*cb)->ls_id = ls->ls_global_id;
+       memcpy((*cb)->res_name, rsb->res_name, rsb->res_length);
+       (*cb)->res_length = rsb->res_length;
 
-       list_add_tail(&cb->list, &lkb->lkb_callbacks);
+       (*cb)->flags = flags;
+       (*cb)->mode = mode;
+       (*cb)->sb_status = status;
+       (*cb)->sb_flags = (sbflags & 0x000000FF);
+       (*cb)->copy_lvb = copy_lvb;
+       (*cb)->lkb_lksb = lkb->lkb_lksb;
+       kref_init(&(*cb)->ref);
 
        if (flags & DLM_CB_BAST) {
                lkb->lkb_last_bast_time = ktime_get();
-               lkb->lkb_last_bast_mode = cb->mode;
+               lkb->lkb_last_bast_mode = mode;
        } else if (flags & DLM_CB_CAST) {
-               dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
+               dlm_callback_set_last_ptr(&lkb->lkb_last_cast, *cb);
                lkb->lkb_last_cast_time = ktime_get();
        }
 
-       dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
+       dlm_callback_set_last_ptr(&lkb->lkb_last_cb, *cb);
+       rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
 
- out:
+out:
        return rv;
 }
 
-int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
-{
-       /* oldest undelivered cb is callbacks first entry */
-       *cb = list_first_entry_or_null(&lkb->lkb_callbacks,
-                                      struct dlm_callback, list);
-       if (!*cb)
-               return DLM_DEQUEUE_CALLBACK_EMPTY;
-
-       /* remove it from callbacks so shift others down */
-       list_del(&(*cb)->list);
-       if (list_empty(&lkb->lkb_callbacks))
-               return DLM_DEQUEUE_CALLBACK_LAST;
-
-       return DLM_DEQUEUE_CALLBACK_SUCCESS;
-}
-
 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
-               uint32_t sbflags)
+                 uint32_t sbflags)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+       struct dlm_callback *cb;
        int rv;
 
        if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
@@ -146,18 +155,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
                return;
        }
 
-       spin_lock(&lkb->lkb_cb_lock);
-       rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
+       rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags,
+                                   &cb);
        switch (rv) {
        case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
-               kref_get(&lkb->lkb_ref);
+               cb->astfn = lkb->lkb_astfn;
+               cb->bastfn = lkb->lkb_bastfn;
+               cb->astparam = lkb->lkb_astparam;
+               INIT_WORK(&cb->work, dlm_callback_work);
 
                spin_lock(&ls->ls_cb_lock);
-               if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
-                       list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
-               } else {
-                       queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
-               }
+               if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
+                       list_add(&cb->list, &ls->ls_cb_delay);
+               else
+                       queue_work(ls->ls_callback_wq, &cb->work);
                spin_unlock(&ls->ls_cb_lock);
                break;
        case DLM_ENQUEUE_CALLBACK_SUCCESS:
@@ -168,67 +179,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
                WARN_ON_ONCE(1);
                break;
        }
-       spin_unlock(&lkb->lkb_cb_lock);
-}
-
-void dlm_callback_work(struct work_struct *work)
-{
-       struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
-       struct dlm_ls *ls = lkb->lkb_resource->res_ls;
-       struct dlm_rsb *rsb = lkb->lkb_resource;
-       void (*castfn) (void *astparam);
-       void (*bastfn) (void *astparam, int mode);
-       struct dlm_callback *cb;
-       int rv;
-
-       spin_lock(&lkb->lkb_cb_lock);
-       rv = dlm_dequeue_lkb_callback(lkb, &cb);
-       if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
-               clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-               spin_unlock(&lkb->lkb_cb_lock);
-               goto out;
-       }
-       spin_unlock(&lkb->lkb_cb_lock);
-
-       for (;;) {
-               castfn = lkb->lkb_astfn;
-               bastfn = lkb->lkb_bastfn;
-
-               if (cb->flags & DLM_CB_BAST) {
-                       trace_dlm_bast(ls->ls_global_id, lkb->lkb_id,
-                                      cb->mode, rsb->res_name,
-                                      rsb->res_length);
-                       bastfn(lkb->lkb_astparam, cb->mode);
-               } else if (cb->flags & DLM_CB_CAST) {
-                       lkb->lkb_lksb->sb_status = cb->sb_status;
-                       lkb->lkb_lksb->sb_flags = cb->sb_flags;
-                       trace_dlm_ast(ls->ls_global_id, lkb->lkb_id,
-                                     cb->sb_flags, cb->sb_status,
-                                     rsb->res_name, rsb->res_length);
-                       castfn(lkb->lkb_astparam);
-               }
-
-               kref_put(&cb->ref, dlm_release_callback);
-
-               spin_lock(&lkb->lkb_cb_lock);
-               rv = dlm_dequeue_lkb_callback(lkb, &cb);
-               if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
-                       clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-                       spin_unlock(&lkb->lkb_cb_lock);
-                       break;
-               }
-               spin_unlock(&lkb->lkb_cb_lock);
-       }
-
-out:
-       /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
-       dlm_put_lkb(lkb);
 }
 
 int dlm_callback_start(struct dlm_ls *ls)
 {
-       ls->ls_callback_wq = alloc_workqueue("dlm_callback",
-                                            WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
+       ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback",
+                                                    WQ_HIGHPRI | WQ_MEM_RECLAIM);
        if (!ls->ls_callback_wq) {
                log_print("can't start dlm_callback workqueue");
                return -ENOMEM;
@@ -257,7 +213,7 @@ void dlm_callback_suspend(struct dlm_ls *ls)
 
 void dlm_callback_resume(struct dlm_ls *ls)
 {
-       struct dlm_lkb *lkb, *safe;
+       struct dlm_callback *cb, *safe;
        int count = 0, sum = 0;
        bool empty;
 
@@ -266,9 +222,9 @@ void dlm_callback_resume(struct dlm_ls *ls)
 
 more:
        spin_lock(&ls->ls_cb_lock);
-       list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
-               list_del_init(&lkb->lkb_cb_list);
-               queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+       list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
+               list_del(&cb->list);
+               queue_work(ls->ls_callback_wq, &cb->work);
                count++;
                if (count == MAX_CB_QUEUE)
                        break;
index ce007892dc2d30aa41d68db9d72ed9000d2cd7b6..9bd12409e1ee73688dc0e05ffd2aa2741064696d 100644 (file)
 #define DLM_ENQUEUE_CALLBACK_NEED_SCHED        1
 #define DLM_ENQUEUE_CALLBACK_SUCCESS   0
 #define DLM_ENQUEUE_CALLBACK_FAILURE   -1
-int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
-                            int status, uint32_t sbflags);
-#define DLM_DEQUEUE_CALLBACK_EMPTY     2
-#define DLM_DEQUEUE_CALLBACK_LAST      1
-#define DLM_DEQUEUE_CALLBACK_SUCCESS   0
-int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb);
+int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+                          int status, uint32_t sbflags,
+                          struct dlm_callback **cb);
 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
                 uint32_t sbflags);
 void dlm_callback_set_last_ptr(struct dlm_callback **from,
                               struct dlm_callback *to);
 
 void dlm_release_callback(struct kref *ref);
-void dlm_callback_work(struct work_struct *work);
 int dlm_callback_start(struct dlm_ls *ls);
 void dlm_callback_stop(struct dlm_ls *ls);
 void dlm_callback_suspend(struct dlm_ls *ls);
index a9137c90f34831eaa9d1aea320a4286c1d450592..cda1526fd15b1f6cc828c07c4deabc63f0e75897 100644 (file)
@@ -16,6 +16,7 @@
  * This is the main header file to be included in each DLM source file.
  */
 
+#include <uapi/linux/dlm_device.h>
 #include <linux/slab.h>
 #include <linux/sched.h>
 #include <linux/types.h>
@@ -204,8 +205,7 @@ struct dlm_args {
 #define DLM_IFL_OVERLAP_CANCEL_BIT 20
 #define DLM_IFL_ENDOFLIFE_BIT  21
 #define DLM_IFL_DEADLOCK_CANCEL_BIT 24
-#define DLM_IFL_CB_PENDING_BIT 25
-#define __DLM_IFL_MAX_BIT      DLM_IFL_CB_PENDING_BIT
+#define __DLM_IFL_MAX_BIT      DLM_IFL_DEADLOCK_CANCEL_BIT
 
 /* lkb_dflags */
 
@@ -217,12 +217,45 @@ struct dlm_args {
 #define DLM_CB_CAST            0x00000001
 #define DLM_CB_BAST            0x00000002
 
+/* much of this is just saving user space pointers associated with the
+ * lock that we pass back to the user lib with an ast
+ */
+
+struct dlm_user_args {
+       struct dlm_user_proc    *proc; /* each process that opens the lockspace
+                                       * device has private data
+                                       * (dlm_user_proc) on the struct file,
+                                       * the process's locks point back to it
+                                       */
+       struct dlm_lksb         lksb;
+       struct dlm_lksb __user  *user_lksb;
+       void __user             *castparam;
+       void __user             *castaddr;
+       void __user             *bastparam;
+       void __user             *bastaddr;
+       uint64_t                xid;
+};
+
 struct dlm_callback {
        uint32_t                flags;          /* DLM_CBF_ */
        int                     sb_status;      /* copy to lksb status */
        uint8_t                 sb_flags;       /* copy to lksb flags */
        int8_t                  mode; /* rq mode of bast, gr mode of cast */
-       int                     copy_lvb;
+       bool                    copy_lvb;
+       struct dlm_lksb         *lkb_lksb;
+       unsigned char           lvbptr[DLM_USER_LVB_LEN];
+
+       union {
+               void                    *astparam;      /* caller's ast arg */
+               struct dlm_user_args    ua;
+       };
+       struct work_struct      work;
+       void                    (*bastfn)(void *astparam, int mode);
+       void                    (*astfn)(void *astparam);
+       char                    res_name[DLM_RESNAME_MAXLEN];
+       size_t                  res_length;
+       uint32_t                ls_id;
+       uint32_t                lkb_id;
 
        struct list_head        list;
        struct kref             ref;
@@ -256,10 +289,6 @@ struct dlm_lkb {
        struct list_head        lkb_ownqueue;   /* list of locks for a process */
        ktime_t                 lkb_timestamp;
 
-       spinlock_t              lkb_cb_lock;
-       struct work_struct      lkb_cb_work;
-       struct list_head        lkb_cb_list; /* for ls_cb_delay or proc->asts */
-       struct list_head        lkb_callbacks;
        struct dlm_callback     *lkb_last_cast;
        struct dlm_callback     *lkb_last_cb;
        int                     lkb_last_bast_mode;
@@ -688,23 +717,6 @@ struct dlm_ls {
 #define LSFL_CB_DELAY          9
 #define LSFL_NODIR             10
 
-/* much of this is just saving user space pointers associated with the
-   lock that we pass back to the user lib with an ast */
-
-struct dlm_user_args {
-       struct dlm_user_proc    *proc; /* each process that opens the lockspace
-                                         device has private data
-                                         (dlm_user_proc) on the struct file,
-                                         the process's locks point back to it*/
-       struct dlm_lksb         lksb;
-       struct dlm_lksb __user  *user_lksb;
-       void __user             *castparam;
-       void __user             *castaddr;
-       void __user             *bastparam;
-       void __user             *bastaddr;
-       uint64_t                xid;
-};
-
 #define DLM_PROC_FLAGS_CLOSING 1
 #define DLM_PROC_FLAGS_COMPAT  2
 
index fd752dd03896348504200202199299b4389eb869..198672446dcdddb4936809a9a92abebef9096635 100644 (file)
@@ -1203,10 +1203,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
        kref_init(&lkb->lkb_ref);
        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
-       INIT_LIST_HEAD(&lkb->lkb_cb_list);
-       INIT_LIST_HEAD(&lkb->lkb_callbacks);
-       spin_lock_init(&lkb->lkb_cb_lock);
-       INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 
        idr_preload(GFP_NOFS);
        spin_lock(&ls->ls_lkbidr_spin);
@@ -6003,6 +5999,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
 
 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 {
+       struct dlm_callback *cb, *cb_safe;
        struct dlm_lkb *lkb, *safe;
 
        dlm_lock_recovery(ls);
@@ -6032,10 +6029,9 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
                dlm_put_lkb(lkb);
        }
 
-       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
-               dlm_purge_lkb_callbacks(lkb);
-               list_del_init(&lkb->lkb_cb_list);
-               dlm_put_lkb(lkb);
+       list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
+               list_del(&cb->list);
+               kref_put(&cb->ref, dlm_release_callback);
        }
 
        spin_unlock(&ls->ls_clear_proc_locks);
@@ -6044,6 +6040,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 
 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 {
+       struct dlm_callback *cb, *cb_safe;
        struct dlm_lkb *lkb, *safe;
 
        while (1) {
@@ -6073,10 +6070,9 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
        spin_unlock(&proc->locks_spin);
 
        spin_lock(&proc->asts_spin);
-       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
-               dlm_purge_lkb_callbacks(lkb);
-               list_del_init(&lkb->lkb_cb_list);
-               dlm_put_lkb(lkb);
+       list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
+               list_del(&cb->list);
+               kref_put(&cb->ref, dlm_release_callback);
        }
        spin_unlock(&proc->asts_spin);
 }
index fa99b6074e5ce4cf1eb42b37567b8532826f9acb..334a6d64d413fe48511532368e78dfdb0eac5375 100644 (file)
@@ -21,6 +21,7 @@
 #include "dlm_internal.h"
 #include "lockspace.h"
 #include "lock.h"
+#include "lvb_table.h"
 #include "user.h"
 #include "ast.h"
 #include "config.h"
@@ -144,24 +145,6 @@ static void compat_output(struct dlm_lock_result *res,
 }
 #endif
 
-/* should held proc->asts_spin lock */
-void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
-{
-       struct dlm_callback *cb, *safe;
-
-       list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
-               list_del(&cb->list);
-               kref_put(&cb->ref, dlm_release_callback);
-       }
-
-       clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-
-       /* invalidate */
-       dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
-       dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
-       lkb->lkb_last_bast_mode = -1;
-}
-
 /* Figure out if this lock is at the end of its life and no longer
    available for the application to use.  The lkb still exists until
    the final ast is read.  A lock becomes EOL in three situations:
@@ -198,6 +181,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
        struct dlm_ls *ls;
        struct dlm_user_args *ua;
        struct dlm_user_proc *proc;
+       struct dlm_callback *cb;
        int rv;
 
        if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
@@ -229,11 +213,18 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
 
        spin_lock(&proc->asts_spin);
 
-       rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
+       rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
        switch (rv) {
        case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
-               kref_get(&lkb->lkb_ref);
-               list_add_tail(&lkb->lkb_cb_list, &proc->asts);
+               cb->ua = *ua;
+               cb->lkb_lksb = &cb->ua.lksb;
+               if (cb->copy_lvb) {
+                       memcpy(cb->lvbptr, ua->lksb.sb_lvbptr,
+                              DLM_USER_LVB_LEN);
+                       cb->lkb_lksb->sb_lvbptr = cb->lvbptr;
+               }
+
+               list_add_tail(&cb->list, &proc->asts);
                wake_up_interruptible(&proc->wait);
                break;
        case DLM_ENQUEUE_CALLBACK_SUCCESS:
@@ -801,10 +792,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
                           loff_t *ppos)
 {
        struct dlm_user_proc *proc = file->private_data;
-       struct dlm_lkb *lkb;
        DECLARE_WAITQUEUE(wait, current);
        struct dlm_callback *cb;
-       struct dlm_rsb *rsb;
        int rv, ret;
 
        if (count == sizeof(struct dlm_device_version)) {
@@ -824,8 +813,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
 #endif
                return -EINVAL;
 
- try_another:
-
        /* do we really need this? can a read happen after a close? */
        if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
                return -EINVAL;
@@ -860,55 +847,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
           without removing lkb_cb_list; so empty lkb_cb_list is always
           consistent with empty lkb_callbacks */
 
-       lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
-
-       rv = dlm_dequeue_lkb_callback(lkb, &cb);
-       switch (rv) {
-       case DLM_DEQUEUE_CALLBACK_EMPTY:
-               /* this shouldn't happen; lkb should have been removed from
-                * list when last item was dequeued
-                */
-               log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
-               list_del_init(&lkb->lkb_cb_list);
-               spin_unlock(&proc->asts_spin);
-               /* removes ref for proc->asts, may cause lkb to be freed */
-               dlm_put_lkb(lkb);
-               WARN_ON_ONCE(1);
-               goto try_another;
-       case DLM_DEQUEUE_CALLBACK_LAST:
-               list_del_init(&lkb->lkb_cb_list);
-               clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
-               break;
-       case DLM_DEQUEUE_CALLBACK_SUCCESS:
-               break;
-       default:
-               WARN_ON_ONCE(1);
-               break;
-       }
+       cb = list_first_entry(&proc->asts, struct dlm_callback, list);
+       list_del(&cb->list);
        spin_unlock(&proc->asts_spin);
 
-       rsb = lkb->lkb_resource;
        if (cb->flags & DLM_CB_BAST) {
-               trace_dlm_bast(rsb->res_ls->ls_global_id, lkb->lkb_id,
-                              cb->mode, rsb->res_name, rsb->res_length);
+               trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
+                              cb->res_length);
        } else if (cb->flags & DLM_CB_CAST) {
-               lkb->lkb_lksb->sb_status = cb->sb_status;
-               lkb->lkb_lksb->sb_flags = cb->sb_flags;
-               trace_dlm_ast(rsb->res_ls->ls_global_id, lkb->lkb_id,
-                             cb->sb_flags, cb->sb_status, rsb->res_name,
-                             rsb->res_length);
+               cb->lkb_lksb->sb_status = cb->sb_status;
+               cb->lkb_lksb->sb_flags = cb->sb_flags;
+               trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status,
+                             cb->sb_flags, cb->res_name, cb->res_length);
        }
 
-       ret = copy_result_to_user(lkb->lkb_ua,
+       ret = copy_result_to_user(&cb->ua,
                                  test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
                                  cb->flags, cb->mode, cb->copy_lvb, buf, count);
-
        kref_put(&cb->ref, dlm_release_callback);
-
-       /* removes ref for proc->asts, may cause lkb to be freed */
-       if (rv == DLM_DEQUEUE_CALLBACK_LAST)
-               dlm_put_lkb(lkb);
-
        return ret;
 }