]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
ceph: wait for the first reply of inflight async unlink
authorXiubo Li <xiubli@redhat.com>
Tue, 10 May 2022 01:47:01 +0000 (09:47 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 2 Aug 2022 22:54:12 +0000 (00:54 +0200)
In async unlink case the kclient won't wait for the first reply
from MDS and just drop all the links and unhash the dentry and then
succeeds immediately.

For any new create/link/rename,etc requests followed by using the
same file names we must wait for the first reply of the inflight
unlink request, or the MDS possibly will fail these following
requests with -EEXIST if the inflight async unlink request was
delayed for some reasons.

And the worst case is that for the none async openc request it will
successfully open the file if the CDentry hasn't been unlinked yet,
but later the previous delayed async unlink request will remove the
CDenty. That means the just created file is possiblly deleted later
by accident.

We need to wait for the inflight async unlink requests to finish
when creating new files/directories by using the same file names.

Link: https://tracker.ceph.com/issues/55332
Signed-off-by: Xiubo Li <xiubli@redhat.com>
Reviewed-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h

index eae417d71136411a675497cdada19ff64d43b1a0..e7e2ebac330d8c74befc708dfd2a854bc032d7a5 100644 (file)
@@ -856,6 +856,10 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir,
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_quota_is_max_files_exceeded(dir)) {
                err = -EDQUOT;
                goto out;
@@ -918,6 +922,10 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir,
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_quota_is_max_files_exceeded(dir)) {
                err = -EDQUOT;
                goto out;
@@ -968,9 +976,13 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
        struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
        struct ceph_mds_request *req;
        struct ceph_acl_sec_ctx as_ctx = {};
-       int err = -EROFS;
+       int err;
        int op;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_snap(dir) == CEPH_SNAPDIR) {
                /* mkdir .snap/foo is a MKSNAP */
                op = CEPH_MDS_OP_MKSNAP;
@@ -980,6 +992,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
                dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
                op = CEPH_MDS_OP_MKDIR;
        } else {
+               err = -EROFS;
                goto out;
        }
 
@@ -1037,6 +1050,10 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        struct ceph_mds_request *req;
        int err;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
@@ -1071,9 +1088,27 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_request *req)
 {
+       struct dentry *dentry = req->r_dentry;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+       struct ceph_dentry_info *di = ceph_dentry(dentry);
        int result = req->r_err ? req->r_err :
                        le32_to_cpu(req->r_reply_info.head->result);
 
+       if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
+               pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
+                       __func__, dentry, dentry);
+
+       spin_lock(&fsc->async_unlink_conflict_lock);
+       hash_del_rcu(&di->hnode);
+       spin_unlock(&fsc->async_unlink_conflict_lock);
+
+       spin_lock(&dentry->d_lock);
+       di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
+       wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT);
+       spin_unlock(&dentry->d_lock);
+
+       synchronize_rcu();
+
        if (result == -EJUKEBOX)
                goto out;
 
@@ -1081,7 +1116,7 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
        if (result) {
                int pathlen = 0;
                u64 base = 0;
-               char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
+               char *path = ceph_mdsc_build_path(dentry, &pathlen,
                                                  &base, 0);
 
                /* mark error on parent + clear complete */
@@ -1089,13 +1124,13 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
                ceph_dir_clear_complete(req->r_parent);
 
                /* drop the dentry -- we don't know its status */
-               if (!d_unhashed(req->r_dentry))
-                       d_drop(req->r_dentry);
+               if (!d_unhashed(dentry))
+                       d_drop(dentry);
 
                /* mark inode itself for an error (since metadata is bogus) */
                mapping_set_error(req->r_old_inode->i_mapping, result);
 
-               pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
+               pr_warn("async unlink failure path=(%llx)%s result=%d!\n",
                        base, IS_ERR(path) ? "<<bad>>" : path, result);
                ceph_mdsc_free_path(path, pathlen);
        }
@@ -1180,6 +1215,8 @@ retry:
 
        if (try_async && op == CEPH_MDS_OP_UNLINK &&
            (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
+               struct ceph_dentry_info *di = ceph_dentry(dentry);
+
                dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir),
                     dentry->d_name.len, dentry->d_name.name,
                     ceph_cap_string(req->r_dir_caps));
@@ -1187,6 +1224,16 @@ retry:
                req->r_callback = ceph_async_unlink_cb;
                req->r_old_inode = d_inode(dentry);
                ihold(req->r_old_inode);
+
+               spin_lock(&dentry->d_lock);
+               di->flags |= CEPH_DENTRY_ASYNC_UNLINK;
+               spin_unlock(&dentry->d_lock);
+
+               spin_lock(&fsc->async_unlink_conflict_lock);
+               hash_add_rcu(fsc->async_unlink_conflict, &di->hnode,
+                            dentry->d_name.hash);
+               spin_unlock(&fsc->async_unlink_conflict_lock);
+
                err = ceph_mdsc_submit_request(mdsc, dir, req);
                if (!err) {
                        /*
@@ -1195,10 +1242,20 @@ retry:
                         */
                        drop_nlink(inode);
                        d_delete(dentry);
-               } else if (err == -EJUKEBOX) {
-                       try_async = false;
-                       ceph_mdsc_put_request(req);
-                       goto retry;
+               } else {
+                       spin_lock(&fsc->async_unlink_conflict_lock);
+                       hash_del_rcu(&di->hnode);
+                       spin_unlock(&fsc->async_unlink_conflict_lock);
+
+                       spin_lock(&dentry->d_lock);
+                       di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
+                       spin_unlock(&dentry->d_lock);
+
+                       if (err == -EJUKEBOX) {
+                               try_async = false;
+                               ceph_mdsc_put_request(req);
+                               goto retry;
+                       }
                }
        } else {
                set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
@@ -1237,6 +1294,10 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir,
            (!ceph_quota_is_same_realm(old_dir, new_dir)))
                return -EXDEV;
 
+       err = ceph_wait_on_conflict_unlink(new_dentry);
+       if (err)
+               return err;
+
        dout("rename dir %p dentry %p to dir %p dentry %p\n",
             old_dir, old_dentry, new_dir, new_dentry);
        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
index da59e836a06eb7502c84ff0bb9d7a0f9e49a7b31..0f3424dc618bfe2da6d514c0a271f6ae37d277cf 100644 (file)
@@ -569,7 +569,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
                char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
                                                  &base, 0);
 
-               pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
+               pr_warn("async create failure path=(%llx)%s result=%d!\n",
                        base, IS_ERR(path) ? "<<bad>>" : path, result);
                ceph_mdsc_free_path(path, pathlen);
 
@@ -740,6 +740,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        if (dentry->d_name.len > NAME_MAX)
                return -ENAMETOOLONG;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (flags & O_CREAT) {
                if (ceph_quota_is_max_files_exceeded(dir))
                        return -EDQUOT;
index 0aded10375fdd79c0a42f9287b83d4c5d8bd1b22..f6da80d110dca0de940acce92d0f95b6a135a840 100644 (file)
@@ -456,7 +456,7 @@ static int ceph_parse_deleg_inos(void **p, void *end,
                                dout("added delegated inode 0x%llx\n",
                                     start - 1);
                        } else if (err == -EBUSY) {
-                               pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
+                               pr_warn("MDS delegated inode 0x%llx more than once.\n",
                                        start - 1);
                        } else {
                                return err;
@@ -655,6 +655,79 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
 }
 
+/*
+ * In async unlink case the kclient won't wait for the first reply
+ * from MDS and just drop all the links and unhash the dentry and then
+ * succeeds immediately.
+ *
+ * For any new create/link/rename,etc requests followed by using the
+ * same file names we must wait for the first reply of the inflight
+ * unlink request, or the MDS possibly will fail these following
+ * requests with -EEXIST if the inflight async unlink request was
+ * delayed for some reasons.
+ *
+ * And the worst case is that for the none async openc request it will
+ * successfully open the file if the CDentry hasn't been unlinked yet,
+ * but later the previous delayed async unlink request will remove the
+ * CDenty. That means the just created file is possiblly deleted later
+ * by accident.
+ *
+ * We need to wait for the inflight async unlink requests to finish
+ * when creating new files/directories by using the same file names.
+ */
+int ceph_wait_on_conflict_unlink(struct dentry *dentry)
+{
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+       struct dentry *pdentry = dentry->d_parent;
+       struct dentry *udentry, *found = NULL;
+       struct ceph_dentry_info *di;
+       struct qstr dname;
+       u32 hash = dentry->d_name.hash;
+       int err;
+
+       dname.name = dentry->d_name.name;
+       dname.len = dentry->d_name.len;
+
+       rcu_read_lock();
+       hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
+                                  hnode, hash) {
+               udentry = di->dentry;
+
+               spin_lock(&udentry->d_lock);
+               if (udentry->d_name.hash != hash)
+                       goto next;
+               if (unlikely(udentry->d_parent != pdentry))
+                       goto next;
+               if (!hash_hashed(&di->hnode))
+                       goto next;
+
+               if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
+                       pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
+                               __func__, dentry, dentry);
+
+               if (!d_same_name(udentry, pdentry, &dname))
+                       goto next;
+
+               spin_unlock(&udentry->d_lock);
+               found = dget(udentry);
+               break;
+next:
+               spin_unlock(&udentry->d_lock);
+       }
+       rcu_read_unlock();
+
+       if (likely(!found))
+               return 0;
+
+       dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
+            dentry, dentry, found, found);
+
+       err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
+                         TASK_KILLABLE);
+       dput(found);
+       return err;
+}
+
 
 /*
  * sessions
index 4620167f58eb911db2ead8cdad5e86df240f430d..d8ec2ac93da328b0a017a2346ebf192fa28d3cb8 100644 (file)
@@ -575,6 +575,7 @@ static inline int ceph_wait_on_async_create(struct inode *inode)
                           TASK_KILLABLE);
 }
 
+extern int ceph_wait_on_conflict_unlink(struct dentry *dentry);
 extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session);
 extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino);
 #endif
index 40140805bdcfe924d2521c9eba068e7112a6c3ec..5539f6c87a45e3a8923bb7632e3cb1c9743f1545 100644 (file)
@@ -816,6 +816,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
        if (!fsc->cap_wq)
                goto fail_inode_wq;
 
+       hash_init(fsc->async_unlink_conflict);
+       spin_lock_init(&fsc->async_unlink_conflict_lock);
+
        spin_lock(&ceph_fsc_lock);
        list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
        spin_unlock(&ceph_fsc_lock);
index f59dac66955bbdaee08a00257ead922ea7b03061..59469253592bd0944e1cee9a8ac6f9e925bb4dc2 100644 (file)
@@ -19,6 +19,7 @@
 #include <linux/security.h>
 #include <linux/netfs.h>
 #include <linux/fscache.h>
+#include <linux/hashtable.h>
 
 #include <linux/ceph/libceph.h>
 
@@ -99,6 +100,8 @@ struct ceph_mount_options {
        char *mon_addr;
 };
 
+#define CEPH_ASYNC_CREATE_CONFLICT_BITS 8
+
 struct ceph_fs_client {
        struct super_block *sb;
 
@@ -124,6 +127,9 @@ struct ceph_fs_client {
        struct workqueue_struct *inode_wq;
        struct workqueue_struct *cap_wq;
 
+       DECLARE_HASHTABLE(async_unlink_conflict, CEPH_ASYNC_CREATE_CONFLICT_BITS);
+       spinlock_t async_unlink_conflict_lock;
+
 #ifdef CONFIG_DEBUG_FS
        struct dentry *debugfs_dentry_lru, *debugfs_caps;
        struct dentry *debugfs_congestion_kb;
@@ -280,7 +286,8 @@ struct ceph_dentry_info {
        struct dentry *dentry;
        struct ceph_mds_session *lease_session;
        struct list_head lease_list;
-       unsigned flags;
+       struct hlist_node hnode;
+       unsigned long flags;
        int lease_shared_gen;
        u32 lease_gen;
        u32 lease_seq;
@@ -289,10 +296,12 @@ struct ceph_dentry_info {
        u64 offset;
 };
 
-#define CEPH_DENTRY_REFERENCED         1
-#define CEPH_DENTRY_LEASE_LIST         2
-#define CEPH_DENTRY_SHRINK_LIST                4
-#define CEPH_DENTRY_PRIMARY_LINK       8
+#define CEPH_DENTRY_REFERENCED         (1 << 0)
+#define CEPH_DENTRY_LEASE_LIST         (1 << 1)
+#define CEPH_DENTRY_SHRINK_LIST                (1 << 2)
+#define CEPH_DENTRY_PRIMARY_LINK       (1 << 3)
+#define CEPH_DENTRY_ASYNC_UNLINK_BIT   (4)
+#define CEPH_DENTRY_ASYNC_UNLINK       (1 << CEPH_DENTRY_ASYNC_UNLINK_BIT)
 
 struct ceph_inode_xattrs_info {
        /*