/* TODO move this to lib/refcount.c */
static __must_check bool
-dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
__cond_acquires(lock)
{
if (refcount_dec_not_one(r))
return false;
- spin_lock_bh(lock);
+ write_lock_bh(lock);
if (!refcount_dec_and_test(r)) {
- spin_unlock_bh(lock);
+ write_unlock_bh(lock);
return false;
}
}
/* TODO move this to include/linux/kref.h */
-static inline int dlm_kref_put_lock_bh(struct kref *kref,
- void (*release)(struct kref *kref),
- spinlock_t *lock)
+static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
+ void (*release)(struct kref *kref),
+ rwlock_t *lock)
{
- if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+ if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
release(kref);
return 1;
}
struct dlm_ls *ls = r->res_ls;
int rv;
- rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb,
- &ls->ls_rsbtbl_lock);
+ rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
+ &ls->ls_rsbtbl_lock);
if (rv)
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
}
void dlm_put_rsb(struct dlm_rsb *r)
* a caching handling and the other holders might to put
* this rsb out of the toss state.
*/
- rv = spin_trylock(&ls->ls_rsbtbl_lock);
+ rv = write_trylock(&ls->ls_rsbtbl_lock);
if (!rv) {
spin_unlock(&ls->ls_toss_q_lock);
/* rearm again try timer */
/* not necessary to held the ls_rsbtbl_lock when
* calling send_remove()
*/
- spin_unlock(&ls->ls_rsbtbl_lock);
+ write_unlock(&ls->ls_rsbtbl_lock);
/* remove the rsb out of the toss queue its gone
* drom DLM now
static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
{
- int rv;
-
- rv = rhashtable_insert_fast(rhash, &rsb->res_node,
- dlm_rhash_rsb_params);
- if (rv == -EEXIST) {
- log_print("%s match", __func__);
- dlm_dump_rsb(rsb);
- }
-
- return rv;
+ return rhashtable_insert_fast(rhash, &rsb->res_node,
+ dlm_rhash_rsb_params);
}
/*
goto out;
}
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
+ /* check if the rsb is in keep state under read lock - likely path */
+ read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
- if (error)
+ if (error) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
+ }
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
- if (rsb_flag(r, RSB_TOSS))
+ if (rsb_flag(r, RSB_TOSS)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_toss;
+ }
kref_get(&r->res_ref);
- goto out_unlock;
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto out;
do_toss:
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+
+ /* retry lookup under write lock to see if its still in toss state
+ * if not it's in keep state and we relookup - unlikely path.
+ */
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ if (!error) {
+ if (!rsb_flag(r, RSB_TOSS)) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto retry_lookup;
+ }
+ } else {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto do_new;
+ }
+
/*
* rsb found inactive (master_nodeid may be out of date unless
* we are the dir_nodeid or were the master) No other thread
log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
from_nodeid, r->res_master_nodeid, dir_nodeid,
r->res_name);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -ENOTBLK;
- goto out_unlock;
+ goto out;
}
if ((r->res_master_nodeid != our_nodeid) && from_dir) {
*/
kref_init(&r->res_ref);
rsb_delete_toss_timer(ls, r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
- goto out_unlock;
+ goto out;
do_new:
*/
if (error == -EBADR && !create)
- goto out_unlock;
+ goto out;
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ if (error == -EAGAIN)
goto retry;
- }
if (error)
- goto out_unlock;
+ goto out;
r->res_hash = hash;
r->res_dir_nodeid = dir_nodeid;
dlm_free_rsb(r);
r = NULL;
error = -ENOTBLK;
- goto out_unlock;
+ goto out;
}
if (from_other) {
}
out_add:
+
+ write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl);
- if (!error)
+ if (error == -EEXIST) {
+ /* somebody else was faster and it seems the
+ * rsb exists now, we do a whole relookup
+ */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ dlm_free_rsb(r);
+ goto retry_lookup;
+ } else if (!error) {
list_add(&r->res_rsbs_list, &ls->ls_keep);
- out_unlock:
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ }
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
out:
*r_ret = r;
return error;
if (error < 0)
goto out;
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
+ /* check if the rsb is in keep state under read lock - likely path */
+ read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
- if (error)
+ if (error) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
+ }
- if (rsb_flag(r, RSB_TOSS))
+ if (rsb_flag(r, RSB_TOSS)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_toss;
+ }
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
kref_get(&r->res_ref);
- goto out_unlock;
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
+
+ goto out;
do_toss:
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+
+ /* retry lookup under write lock to see if its still in toss state
+ * if not it's in keep state and we relookup - unlikely path.
+ */
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ if (!error) {
+ if (!rsb_flag(r, RSB_TOSS)) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto retry_lookup;
+ }
+ } else {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto do_new;
+ }
+
+
/*
* rsb found inactive. No other thread is using this rsb because
* it's on the toss list, so we can look at or update
log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -ENOTBLK;
- goto out_unlock;
+ goto out;
}
if (!recover && (r->res_master_nodeid != our_nodeid) &&
*/
kref_init(&r->res_ref);
rsb_delete_toss_timer(ls, r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
- goto out_unlock;
+ goto out;
do_new:
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry;
}
if (error)
- goto out_unlock;
+ goto out;
r->res_hash = hash;
r->res_dir_nodeid = dir_nodeid;
r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
kref_init(&r->res_ref);
+ write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl);
- if (!error)
+ if (error == -EEXIST) {
+ /* somebody else was faster and it seems the
+ * rsb exists now, we do a whole relookup
+ */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ dlm_free_rsb(r);
+ goto retry_lookup;
+ } else if (!error) {
list_add(&r->res_rsbs_list, &ls->ls_keep);
- out_unlock:
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ }
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+
out:
*r_ret = r;
return error;
if (error < 0)
return error;
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ retry_lookup:
+
+ /* check if the rsb is in keep state under read lock - likely path */
+ read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
- if (rsb_flag(r, RSB_TOSS))
+ if (rsb_flag(r, RSB_TOSS)) {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_toss;
+ }
/* because the rsb is active, we need to lock_rsb before
* checking/changing re_master_nodeid
*/
hold_rsb(r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
lock_rsb(r);
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
return 0;
} else {
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
goto not_found;
}
do_toss:
+ /* unlikely path - relookup under write */
+ write_lock_bh(&ls->ls_rsbtbl_lock);
+
+ /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
+ * check if the rsb is still in toss state, if not relookup
+ */
+ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
+ if (!error) {
+ if (!rsb_flag(r, RSB_TOSS)) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ /* something as changed, very unlikely but
+ * try again
+ */
+ goto retry_lookup;
+ }
+ } else {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ goto not_found;
+ }
+
/* because the rsb is inactive (on toss list), it's not refcounted
* and lock_rsb is not used, but is protected by the rsbtbl lock
*/
rsb_mod_timer(ls, r);
/* the rsb was inactive (on toss list) */
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return 0;
not_found:
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ if (error == -EAGAIN)
goto retry;
- }
if (error)
- goto out_unlock;
+ goto out;
r->res_hash = hash;
r->res_dir_nodeid = our_nodeid;
kref_init(&r->res_ref);
rsb_set_flag(r, RSB_TOSS);
+ write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl);
- if (error) {
+ if (error == -EEXIST) {
+ /* somebody else was faster and it seems the
+ * rsb exists now, we do a whole relookup
+ */
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
+ dlm_free_rsb(r);
+ goto retry_lookup;
+ } else if (error) {
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
/* should never happen */
dlm_free_rsb(r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
goto retry;
}
list_add(&r->res_rsbs_list, &ls->ls_toss);
rsb_mod_timer(ls, r);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
if (result)
*result = DLM_LU_ADD;
*r_nodeid = from_nodeid;
- out_unlock:
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ out:
return error;
}
{
struct dlm_rsb *r;
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
if (r->res_hash == hash)
dlm_dump_rsb(r);
}
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
}
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
struct dlm_rsb *r = NULL;
int error;
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error)
goto out;
dlm_dump_rsb(r);
out:
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
}
static void toss_rsb(struct kref *kref)
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}
+/* TODO move this to lib/refcount.c */
+static __must_check bool
+dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock)
+__cond_acquires(lock)
+{
+ if (refcount_dec_not_one(r))
+ return false;
+
+ spin_lock_bh(lock);
+ if (!refcount_dec_and_test(r)) {
+ spin_unlock_bh(lock);
+ return false;
+ }
+
+ return true;
+}
+
+/* TODO move this to include/linux/kref.h */
+static inline int dlm_kref_put_lock_bh(struct kref *kref,
+ void (*release)(struct kref *kref),
+ spinlock_t *lock)
+{
+ if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) {
+ release(kref);
+ return 1;
+ }
+
+ return 0;
+}
+
/* __put_lkb() is used when an lkb may not have an rsb attached to
it so we need to provide the lockspace explicitly */
memset(name, 0, sizeof(name));
memcpy(name, ms->m_extra, len);
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ write_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (rv) {
/* should not happen */
log_error(ls, "%s from %d not found %s", __func__,
from_nodeid, name);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
log_error(ls, "receive_remove keep from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
log_error(ls, "receive_remove toss from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
list_del(&r->res_rsbs_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ write_unlock_bh(&ls->ls_rsbtbl_lock);
free_toss_rsb(r);
}
{
struct dlm_rsb *r;
- spin_lock_bh(&ls->ls_rsbtbl_lock);
+ read_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
continue;
}
hold_rsb(r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
return r;
}
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
+ read_unlock_bh(&ls->ls_rsbtbl_lock);
return NULL;
}