Previously, the dlm locking only protects several
functions which writes to superblock (update_super,
add_to_super and store_super), and we missed other
funcs such as add_internal_bitmap. We also need to
call the funcs which read superblock under the
locking protection to avoid consistent issue.
So let's remove the dlm stuffs from super1.c, and
provide the locking mechanism to the main() except
assemble mode which will be handled in next commit.
And since we can identify it is a clustered raid or
not based on check the different conditions of each
mode, so the change should not have effect on native
array.
And we improve the existed locking stuffs as follows:
1. replace ls_unlock with ls_unlock_wait since we
should return when unlock operation is complete.
2. inspired by lvm, let's also try to use the existed
lockspace first before creat a lockspace blindly if
the lockspace not released for some reason.
3. try more times before quit if EAGAIN happened for
locking.
Note: for MANAGE mode, we do not need to get lock if
node just want to confirm device change, otherwise we
can't add a disk to cluster since all nodes are compete
for the lock.
Reviewed-by: NeilBrown <neilb@suse.com>
Signed-off-by: Guoqing Jiang <gqjiang@suse.com>
Signed-off-by: Jes Sorensen <jsorensen@fb.com>
struct mddev_dev *devlist = NULL;
struct mddev_dev **devlistend = & devlist;
struct mddev_dev *dv;
struct mddev_dev *devlist = NULL;
struct mddev_dev **devlistend = & devlist;
struct mddev_dev *dv;
+ mdu_array_info_t array;
int devs_found = 0;
char *symlinks = NULL;
int grow_continue = 0;
int devs_found = 0;
char *symlinks = NULL;
int grow_continue = 0;
FILE *outf;
int mdfd = -1;
FILE *outf;
int mdfd = -1;
srandom(time(0) ^ getpid());
srandom(time(0) ^ getpid());
/* --scan implied --brief unless -vv */
c.brief = 1;
/* --scan implied --brief unless -vv */
c.brief = 1;
+ if (mode == CREATE) {
+ if (s.bitmap_file && strcmp(s.bitmap_file, "clustered") == 0) {
+ locked = cluster_get_dlmlock();
+ if (locked != 1)
+ exit(1);
+ }
+ } else if (mode == MANAGE || mode == GROW || mode == INCREMENTAL) {
+ if (!md_get_array_info(mdfd, &array) && (devmode != 'c')) {
+ if (array.state & (1 << MD_SB_CLUSTERED)) {
+ locked = cluster_get_dlmlock();
+ if (locked != 1)
+ exit(1);
+ }
+ }
+ }
+
switch(mode) {
case MANAGE:
/* readonly, add/remove, readwrite, runstop */
switch(mode) {
case MANAGE:
/* readonly, add/remove, readwrite, runstop */
+ if (locked)
+ cluster_release_dlmlock();
if (mdfd > 0)
close(mdfd);
exit(rv);
if (mdfd > 0)
close(mdfd);
exit(rv);
dlm_lshandle_t (*create_lockspace)(const char *name,
unsigned int mode);
dlm_lshandle_t (*create_lockspace)(const char *name,
unsigned int mode);
+ dlm_lshandle_t (*open_lockspace)(const char *name);
int (*release_lockspace)(const char *name, dlm_lshandle_t ls,
int force);
int (*ls_lock)(dlm_lshandle_t lockspace, uint32_t mode,
int (*release_lockspace)(const char *name, dlm_lshandle_t ls,
int force);
int (*ls_lock)(dlm_lshandle_t lockspace, uint32_t mode,
uint32_t parent, void (*astaddr) (void *astarg),
void *astarg, void (*bastaddr) (void *astarg),
void *range);
uint32_t parent, void (*astaddr) (void *astarg),
void *astarg, void (*bastaddr) (void *astarg),
void *range);
- int (*ls_unlock)(dlm_lshandle_t lockspace, uint32_t lkid,
- uint32_t flags, struct dlm_lksb *lksb,
- void *astarg);
+ int (*ls_unlock_wait)(dlm_lshandle_t lockspace, uint32_t lkid,
+ uint32_t flags, struct dlm_lksb *lksb);
int (*ls_get_fd)(dlm_lshandle_t ls);
int (*dispatch)(int fd);
};
extern int get_cluster_name(char **name);
extern int dlm_funs_ready(void);
int (*ls_get_fd)(dlm_lshandle_t ls);
int (*dispatch)(int fd);
};
extern int get_cluster_name(char **name);
extern int dlm_funs_ready(void);
-extern int cluster_get_dlmlock(int *lockid);
-extern int cluster_release_dlmlock(int lockid);
+extern int cluster_get_dlmlock(void);
+extern int cluster_release_dlmlock(void);
extern void set_dlm_hooks(void);
#define _ROUND_UP(val, base) (((val) + (base) - 1) & ~(base - 1))
extern void set_dlm_hooks(void);
#define _ROUND_UP(val, base) (((val) + (base) - 1) & ~(base - 1))
* ignored.
*/
int rv = 0;
* ignored.
*/
int rv = 0;
struct mdp_superblock_1 *sb = st->sb;
bitmap_super_t *bms = (bitmap_super_t*)(((char*)sb) + MAX_SB_SIZE);
struct mdp_superblock_1 *sb = st->sb;
bitmap_super_t *bms = (bitmap_super_t*)(((char*)sb) + MAX_SB_SIZE);
- if (bms->version == BITMAP_MAJOR_CLUSTERED && dlm_funs_ready()) {
- rv = cluster_get_dlmlock(&lockid);
- if (rv) {
- pr_err("Cannot get dlmlock in %s return %d\n",
- __func__, rv);
- cluster_release_dlmlock(lockid);
- return rv;
- }
- }
-
if (strcmp(update, "homehost") == 0 &&
homehost) {
/* Note that 'homehost' is special as it is really
if (strcmp(update, "homehost") == 0 &&
homehost) {
/* Note that 'homehost' is special as it is really
rv = -1;
sb->sb_csum = calc_sb_1_csum(sb);
rv = -1;
sb->sb_csum = calc_sb_1_csum(sb);
- if (bms->version == BITMAP_MAJOR_CLUSTERED && dlm_funs_ready())
- cluster_release_dlmlock(lockid);
struct mdp_superblock_1 *sb = st->sb;
__u16 *rp = sb->dev_roles + dk->number;
struct devinfo *di, **dip;
struct mdp_superblock_1 *sb = st->sb;
__u16 *rp = sb->dev_roles + dk->number;
struct devinfo *di, **dip;
- bitmap_super_t *bms = (bitmap_super_t*)(((char*)sb) + MAX_SB_SIZE);
- int rv, lockid;
- if (bms->version == BITMAP_MAJOR_CLUSTERED && dlm_funs_ready()) {
- rv = cluster_get_dlmlock(&lockid);
- if (rv) {
- pr_err("Cannot get dlmlock in %s return %d\n",
- __func__, rv);
- cluster_release_dlmlock(lockid);
- return rv;
- }
- }
-
dk_state = dk->state & ~(1<<MD_DISK_FAILFAST);
if ((dk_state & (1<<MD_DISK_ACTIVE)) &&
(dk_state & (1<<MD_DISK_SYNC)))/* active, sync */
dk_state = dk->state & ~(1<<MD_DISK_FAILFAST);
if ((dk_state & (1<<MD_DISK_ACTIVE)) &&
(dk_state & (1<<MD_DISK_SYNC)))/* active, sync */
di->next = NULL;
*dip = di;
di->next = NULL;
*dip = di;
- if (bms->version == BITMAP_MAJOR_CLUSTERED && dlm_funs_ready())
- cluster_release_dlmlock(lockid);
-
struct align_fd afd;
int sbsize;
unsigned long long dsize;
struct align_fd afd;
int sbsize;
unsigned long long dsize;
- bitmap_super_t *bms = (bitmap_super_t*)(((char*)sb) + MAX_SB_SIZE);
- int rv, lockid;
-
- if (bms->version == BITMAP_MAJOR_CLUSTERED && dlm_funs_ready()) {
- rv = cluster_get_dlmlock(&lockid);
- if (rv) {
- pr_err("Cannot get dlmlock in %s return %d\n",
- __func__, rv);
- cluster_release_dlmlock(lockid);
- return rv;
- }
- }
if (!get_dev_size(fd, NULL, &dsize))
return 1;
if (!get_dev_size(fd, NULL, &dsize))
return 1;
- if (bms->version == BITMAP_MAJOR_CLUSTERED && dlm_funs_ready())
- cluster_release_dlmlock(lockid);
static char *cluster_name = NULL;
/* Create the lockspace, take bitmapXXX locks on all the bitmaps. */
static char *cluster_name = NULL;
/* Create the lockspace, take bitmapXXX locks on all the bitmaps. */
-int cluster_get_dlmlock(int *lockid)
+int cluster_get_dlmlock(void)
{
int ret = -1;
char str[64];
int flags = LKF_NOQUEUE;
{
int ret = -1;
char str[64];
int flags = LKF_NOQUEUE;
+ int retry_count = 0;
+
+ if (!dlm_funs_ready()) {
+ pr_err("Something wrong with dlm library\n");
+ return -1;
+ }
ret = get_cluster_name(&cluster_name);
if (ret) {
ret = get_cluster_name(&cluster_name);
if (ret) {
}
dlm_lock_res = xmalloc(sizeof(struct dlm_lock_resource));
}
dlm_lock_res = xmalloc(sizeof(struct dlm_lock_resource));
- dlm_lock_res->ls = dlm_hooks->create_lockspace(cluster_name, O_RDWR);
+ dlm_lock_res->ls = dlm_hooks->open_lockspace(cluster_name);
- pr_err("%s failed to create lockspace\n", cluster_name);
- return -ENOMEM;
+ dlm_lock_res->ls = dlm_hooks->create_lockspace(cluster_name, O_RDWR);
+ if (!dlm_lock_res->ls) {
+ pr_err("%s failed to create lockspace\n", cluster_name);
+ return -ENOMEM;
+ }
+ } else {
+ pr_err("open existed %s lockspace\n", cluster_name);
}
snprintf(str, 64, "bitmap%s", cluster_name);
}
snprintf(str, 64, "bitmap%s", cluster_name);
ret = dlm_hooks->ls_lock(dlm_lock_res->ls, LKM_PWMODE,
&dlm_lock_res->lksb, flags, str, strlen(str),
0, dlm_ast, dlm_lock_res, NULL, NULL);
if (ret) {
pr_err("error %d when get PW mode on lock %s\n", errno, str);
ret = dlm_hooks->ls_lock(dlm_lock_res->ls, LKM_PWMODE,
&dlm_lock_res->lksb, flags, str, strlen(str),
0, dlm_ast, dlm_lock_res, NULL, NULL);
if (ret) {
pr_err("error %d when get PW mode on lock %s\n", errno, str);
+ /* let's try several times if EAGAIN happened */
+ if (dlm_lock_res->lksb.sb_status == EAGAIN && retry_count < 10) {
+ sleep(10);
+ retry_count++;
+ goto retry;
+ }
dlm_hooks->release_lockspace(cluster_name, dlm_lock_res->ls, 1);
return ret;
}
/* Wait for it to complete */
poll_for_ast(dlm_lock_res->ls);
dlm_hooks->release_lockspace(cluster_name, dlm_lock_res->ls, 1);
return ret;
}
/* Wait for it to complete */
poll_for_ast(dlm_lock_res->ls);
- *lockid = dlm_lock_res->lksb.sb_lkid;
- return dlm_lock_res->lksb.sb_status;
+ if (dlm_lock_res->lksb.sb_status) {
+ pr_err("failed to lock cluster\n");
+ return -1;
+ }
+ return 1;
-int cluster_release_dlmlock(int lockid)
+int cluster_release_dlmlock(void)
{
int ret = -1;
if (!cluster_name)
{
int ret = -1;
if (!cluster_name)
+ goto out;
+
+ if (!dlm_lock_res->lksb.sb_lkid)
+ goto out;
- ret = dlm_hooks->ls_unlock(dlm_lock_res->ls, lockid, 0,
- &dlm_lock_res->lksb, dlm_lock_res);
+ ret = dlm_hooks->ls_unlock_wait(dlm_lock_res->ls,
+ dlm_lock_res->lksb.sb_lkid, 0,
+ &dlm_lock_res->lksb);
if (ret) {
pr_err("error %d happened when unlock\n", errno);
/* XXX make sure the lock is unlocked eventually */
if (ret) {
pr_err("error %d happened when unlock\n", errno);
/* XXX make sure the lock is unlocked eventually */
if (!dlm_hooks->dlm_handle)
return;
if (!dlm_hooks->dlm_handle)
return;
+ dlm_hooks->open_lockspace =
+ dlsym(dlm_hooks->dlm_handle, "dlm_open_lockspace");
dlm_hooks->create_lockspace =
dlsym(dlm_hooks->dlm_handle, "dlm_create_lockspace");
dlm_hooks->release_lockspace =
dlsym(dlm_hooks->dlm_handle, "dlm_release_lockspace");
dlm_hooks->ls_lock = dlsym(dlm_hooks->dlm_handle, "dlm_ls_lock");
dlm_hooks->create_lockspace =
dlsym(dlm_hooks->dlm_handle, "dlm_create_lockspace");
dlm_hooks->release_lockspace =
dlsym(dlm_hooks->dlm_handle, "dlm_release_lockspace");
dlm_hooks->ls_lock = dlsym(dlm_hooks->dlm_handle, "dlm_ls_lock");
- dlm_hooks->ls_unlock = dlsym(dlm_hooks->dlm_handle, "dlm_ls_unlock");
+ dlm_hooks->ls_unlock_wait =
+ dlsym(dlm_hooks->dlm_handle, "dlm_ls_unlock_wait");
dlm_hooks->ls_get_fd = dlsym(dlm_hooks->dlm_handle, "dlm_ls_get_fd");
dlm_hooks->dispatch = dlsym(dlm_hooks->dlm_handle, "dlm_dispatch");
dlm_hooks->ls_get_fd = dlsym(dlm_hooks->dlm_handle, "dlm_ls_get_fd");
dlm_hooks->dispatch = dlsym(dlm_hooks->dlm_handle, "dlm_dispatch");
- if (!dlm_hooks->create_lockspace || !dlm_hooks->ls_lock ||
- !dlm_hooks->ls_unlock || !dlm_hooks->release_lockspace ||
- !dlm_hooks->ls_get_fd || !dlm_hooks->dispatch)
+ if (!dlm_hooks->open_lockspace || !dlm_hooks->create_lockspace ||
+ !dlm_hooks->ls_lock || !dlm_hooks->ls_unlock_wait ||
+ !dlm_hooks->release_lockspace || !dlm_hooks->ls_get_fd ||
+ !dlm_hooks->dispatch)
dlclose(dlm_hooks->dlm_handle);
else
is_dlm_hooks_ready = 1;
dlclose(dlm_hooks->dlm_handle);
else
is_dlm_hooks_ready = 1;