]> git.ipfire.org Git - people/ms/linux.git/blobdiff - fs/btrfs/delayed-inode.c
Merge branch 'for-6.0/dax' into libnvdimm-fixes
[people/ms/linux.git] / fs / btrfs / delayed-inode.c
index 66779ab3ed4a3f457bdd5617bce91b43c487af67..e7f34871a132898a64ceb5e4d16ad468b8602913 100644 (file)
@@ -52,18 +52,6 @@ static inline void btrfs_init_delayed_node(
        INIT_LIST_HEAD(&delayed_node->p_list);
 }
 
-static inline int btrfs_is_continuous_delayed_item(
-                                       struct btrfs_delayed_item *item1,
-                                       struct btrfs_delayed_item *item2)
-{
-       if (item1->key.type == BTRFS_DIR_INDEX_KEY &&
-           item1->key.objectid == item2->key.objectid &&
-           item1->key.type == item2->key.type &&
-           item1->key.offset + 1 == item2->key.offset)
-               return 1;
-       return 0;
-}
-
 static struct btrfs_delayed_node *btrfs_get_delayed_node(
                struct btrfs_inode *btrfs_inode)
 {
@@ -78,7 +66,7 @@ static struct btrfs_delayed_node *btrfs_get_delayed_node(
        }
 
        spin_lock(&root->inode_lock);
-       node = xa_load(&root->delayed_nodes, ino);
+       node = radix_tree_lookup(&root->delayed_nodes_tree, ino);
 
        if (node) {
                if (btrfs_inode->delayed_node) {
@@ -90,9 +78,9 @@ static struct btrfs_delayed_node *btrfs_get_delayed_node(
 
                /*
                 * It's possible that we're racing into the middle of removing
-                * this node from the xarray.  In this case, the refcount
+                * this node from the radix tree.  In this case, the refcount
                 * was zero and it should never go back to one.  Just return
-                * NULL like it was never in the xarray at all; our release
+                * NULL like it was never in the radix at all; our release
                 * function is in the process of removing it.
                 *
                 * Some implementations of refcount_inc refuse to bump the
@@ -100,7 +88,7 @@ static struct btrfs_delayed_node *btrfs_get_delayed_node(
                 * here, refcount_inc() may decide to just WARN_ONCE() instead
                 * of actually bumping the refcount.
                 *
-                * If this node is properly in the xarray, we want to bump the
+                * If this node is properly in the radix, we want to bump the
                 * refcount twice, once for the inode and once for this get
                 * operation.
                 */
@@ -128,30 +116,36 @@ static struct btrfs_delayed_node *btrfs_get_or_create_delayed_node(
        u64 ino = btrfs_ino(btrfs_inode);
        int ret;
 
-       do {
-               node = btrfs_get_delayed_node(btrfs_inode);
-               if (node)
-                       return node;
+again:
+       node = btrfs_get_delayed_node(btrfs_inode);
+       if (node)
+               return node;
 
-               node = kmem_cache_zalloc(delayed_node_cache, GFP_NOFS);
-               if (!node)
-                       return ERR_PTR(-ENOMEM);
-               btrfs_init_delayed_node(node, root, ino);
+       node = kmem_cache_zalloc(delayed_node_cache, GFP_NOFS);
+       if (!node)
+               return ERR_PTR(-ENOMEM);
+       btrfs_init_delayed_node(node, root, ino);
 
-               /* Cached in the inode and can be accessed */
-               refcount_set(&node->refs, 2);
+       /* cached in the btrfs inode and can be accessed */
+       refcount_set(&node->refs, 2);
 
-               spin_lock(&root->inode_lock);
-               ret = xa_insert(&root->delayed_nodes, ino, node, GFP_NOFS);
-               if (ret) {
-                       spin_unlock(&root->inode_lock);
-                       kmem_cache_free(delayed_node_cache, node);
-                       if (ret != -EBUSY)
-                               return ERR_PTR(ret);
-               }
-       } while (ret);
+       ret = radix_tree_preload(GFP_NOFS);
+       if (ret) {
+               kmem_cache_free(delayed_node_cache, node);
+               return ERR_PTR(ret);
+       }
+
+       spin_lock(&root->inode_lock);
+       ret = radix_tree_insert(&root->delayed_nodes_tree, ino, node);
+       if (ret == -EEXIST) {
+               spin_unlock(&root->inode_lock);
+               kmem_cache_free(delayed_node_cache, node);
+               radix_tree_preload_end();
+               goto again;
+       }
        btrfs_inode->delayed_node = node;
        spin_unlock(&root->inode_lock);
+       radix_tree_preload_end();
 
        return node;
 }
@@ -270,7 +264,8 @@ static void __btrfs_release_delayed_node(
                 * back up.  We can delete it now.
                 */
                ASSERT(refcount_read(&delayed_node->refs) == 0);
-               xa_erase(&root->delayed_nodes, delayed_node->inode_id);
+               radix_tree_delete(&root->delayed_nodes_tree,
+                                 delayed_node->inode_id);
                spin_unlock(&root->inode_lock);
                kmem_cache_free(delayed_node_cache, delayed_node);
        }
@@ -391,8 +386,7 @@ static struct btrfs_delayed_item *__btrfs_lookup_delayed_insertion_item(
 }
 
 static int __btrfs_add_delayed_item(struct btrfs_delayed_node *delayed_node,
-                                   struct btrfs_delayed_item *ins,
-                                   int action)
+                                   struct btrfs_delayed_item *ins)
 {
        struct rb_node **p, *node;
        struct rb_node *parent_node = NULL;
@@ -401,9 +395,9 @@ static int __btrfs_add_delayed_item(struct btrfs_delayed_node *delayed_node,
        int cmp;
        bool leftmost = true;
 
-       if (action == BTRFS_DELAYED_INSERTION_ITEM)
+       if (ins->ins_or_del == BTRFS_DELAYED_INSERTION_ITEM)
                root = &delayed_node->ins_root;
-       else if (action == BTRFS_DELAYED_DELETION_ITEM)
+       else if (ins->ins_or_del == BTRFS_DELAYED_DELETION_ITEM)
                root = &delayed_node->del_root;
        else
                BUG();
@@ -429,32 +423,19 @@ static int __btrfs_add_delayed_item(struct btrfs_delayed_node *delayed_node,
        rb_link_node(node, parent_node, p);
        rb_insert_color_cached(node, root, leftmost);
        ins->delayed_node = delayed_node;
-       ins->ins_or_del = action;
 
-       if (ins->key.type == BTRFS_DIR_INDEX_KEY &&
-           action == BTRFS_DELAYED_INSERTION_ITEM &&
+       /* Delayed items are always for dir index items. */
+       ASSERT(ins->key.type == BTRFS_DIR_INDEX_KEY);
+
+       if (ins->ins_or_del == BTRFS_DELAYED_INSERTION_ITEM &&
            ins->key.offset >= delayed_node->index_cnt)
-                       delayed_node->index_cnt = ins->key.offset + 1;
+               delayed_node->index_cnt = ins->key.offset + 1;
 
        delayed_node->count++;
        atomic_inc(&delayed_node->root->fs_info->delayed_root->items);
        return 0;
 }
 
-static int __btrfs_add_delayed_insertion_item(struct btrfs_delayed_node *node,
-                                             struct btrfs_delayed_item *item)
-{
-       return __btrfs_add_delayed_item(node, item,
-                                       BTRFS_DELAYED_INSERTION_ITEM);
-}
-
-static int __btrfs_add_delayed_deletion_item(struct btrfs_delayed_node *node,
-                                            struct btrfs_delayed_item *item)
-{
-       return __btrfs_add_delayed_item(node, item,
-                                       BTRFS_DELAYED_DELETION_ITEM);
-}
-
 static void finish_one_item(struct btrfs_delayed_root *delayed_root)
 {
        int seq = atomic_inc_return(&delayed_root->items_seq);
@@ -566,7 +547,13 @@ static int btrfs_delayed_item_reserve_metadata(struct btrfs_trans_handle *trans,
                trace_btrfs_space_reservation(fs_info, "delayed_item",
                                              item->key.objectid,
                                              num_bytes, 1);
-               item->bytes_reserved = num_bytes;
+               /*
+                * For insertions we track reserved metadata space by accounting
+                * for the number of leaves that will be used, based on the delayed
+                * node's index_items_size field.
+                */
+               if (item->ins_or_del == BTRFS_DELAYED_DELETION_ITEM)
+                       item->bytes_reserved = num_bytes;
        }
 
        return ret;
@@ -592,6 +579,21 @@ static void btrfs_delayed_item_release_metadata(struct btrfs_root *root,
        btrfs_block_rsv_release(fs_info, rsv, item->bytes_reserved, NULL);
 }
 
+static void btrfs_delayed_item_release_leaves(struct btrfs_delayed_node *node,
+                                             unsigned int num_leaves)
+{
+       struct btrfs_fs_info *fs_info = node->root->fs_info;
+       const u64 bytes = btrfs_calc_insert_metadata_size(fs_info, num_leaves);
+
+       /* There are no space reservations during log replay, bail out. */
+       if (test_bit(BTRFS_FS_LOG_RECOVERING, &fs_info->flags))
+               return;
+
+       trace_btrfs_space_reservation(fs_info, "delayed_item", node->inode_id,
+                                     bytes, 0);
+       btrfs_block_rsv_release(fs_info, &fs_info->delayed_block_rsv, bytes, NULL);
+}
+
 static int btrfs_delayed_inode_reserve_metadata(
                                        struct btrfs_trans_handle *trans,
                                        struct btrfs_root *root,
@@ -665,22 +667,53 @@ static void btrfs_delayed_inode_release_metadata(struct btrfs_fs_info *fs_info,
 }
 
 /*
- * Insert a single delayed item or a batch of delayed items that have consecutive
- * keys if they exist.
+ * Insert a single delayed item or a batch of delayed items, as many as possible
+ * that fit in a leaf. The delayed items (dir index keys) are sorted by their key
+ * in the rbtree, and if there's a gap between two consecutive dir index items,
+ * then it means at some point we had delayed dir indexes to add but they got
+ * removed (by btrfs_delete_delayed_dir_index()) before we attempted to flush them
+ * into the subvolume tree. Dir index keys also have their offsets coming from a
+ * monotonically increasing counter, so we can't get new keys with an offset that
+ * fits within a gap between delayed dir index items.
  */
 static int btrfs_insert_delayed_item(struct btrfs_trans_handle *trans,
                                     struct btrfs_root *root,
                                     struct btrfs_path *path,
                                     struct btrfs_delayed_item *first_item)
 {
+       struct btrfs_fs_info *fs_info = root->fs_info;
+       struct btrfs_delayed_node *node = first_item->delayed_node;
        LIST_HEAD(item_list);
        struct btrfs_delayed_item *curr;
        struct btrfs_delayed_item *next;
-       const int max_size = BTRFS_LEAF_DATA_SIZE(root->fs_info);
+       const int max_size = BTRFS_LEAF_DATA_SIZE(fs_info);
        struct btrfs_item_batch batch;
        int total_size;
        char *ins_data = NULL;
        int ret;
+       bool continuous_keys_only = false;
+
+       lockdep_assert_held(&node->mutex);
+
+       /*
+        * During normal operation the delayed index offset is continuously
+        * increasing, so we can batch insert all items as there will not be any
+        * overlapping keys in the tree.
+        *
+        * The exception to this is log replay, where we may have interleaved
+        * offsets in the tree, so our batch needs to be continuous keys only in
+        * order to ensure we do not end up with out of order items in our leaf.
+        */
+       if (test_bit(BTRFS_FS_LOG_RECOVERING, &fs_info->flags))
+               continuous_keys_only = true;
+
+       /*
+        * For delayed items to insert, we track reserved metadata bytes based
+        * on the number of leaves that we will use.
+        * See btrfs_insert_delayed_dir_index() and
+        * btrfs_delayed_item_reserve_metadata()).
+        */
+       ASSERT(first_item->bytes_reserved == 0);
 
        list_add_tail(&first_item->tree_list, &item_list);
        batch.total_data_size = first_item->data_len;
@@ -692,9 +725,19 @@ static int btrfs_insert_delayed_item(struct btrfs_trans_handle *trans,
                int next_size;
 
                next = __btrfs_next_delayed_item(curr);
-               if (!next || !btrfs_is_continuous_delayed_item(curr, next))
+               if (!next)
+                       break;
+
+               /*
+                * We cannot allow gaps in the key space if we're doing log
+                * replay.
+                */
+               if (continuous_keys_only &&
+                   (next->key.offset != curr->key.offset + 1))
                        break;
 
+               ASSERT(next->bytes_reserved == 0);
+
                next_size = next->data_len + sizeof(struct btrfs_item);
                if (total_size + next_size > max_size)
                        break;
@@ -751,9 +794,41 @@ static int btrfs_insert_delayed_item(struct btrfs_trans_handle *trans,
         */
        btrfs_release_path(path);
 
+       ASSERT(node->index_item_leaves > 0);
+
+       /*
+        * For normal operations we will batch an entire leaf's worth of delayed
+        * items, so if there are more items to process we can decrement
+        * index_item_leaves by 1 as we inserted 1 leaf's worth of items.
+        *
+        * However for log replay we may not have inserted an entire leaf's
+        * worth of items, we may have not had continuous items, so decrementing
+        * here would mess up the index_item_leaves accounting.  For this case
+        * only clean up the accounting when there are no items left.
+        */
+       if (next && !continuous_keys_only) {
+               /*
+                * We inserted one batch of items into a leaf a there are more
+                * items to flush in a future batch, now release one unit of
+                * metadata space from the delayed block reserve, corresponding
+                * the leaf we just flushed to.
+                */
+               btrfs_delayed_item_release_leaves(node, 1);
+               node->index_item_leaves--;
+       } else if (!next) {
+               /*
+                * There are no more items to insert. We can have a number of
+                * reserved leaves > 1 here - this happens when many dir index
+                * items are added and then removed before they are flushed (file
+                * names with a very short life, never span a transaction). So
+                * release all remaining leaves.
+                */
+               btrfs_delayed_item_release_leaves(node, node->index_item_leaves);
+               node->index_item_leaves = 0;
+       }
+
        list_for_each_entry_safe(curr, next, &item_list, tree_list) {
                list_del(&curr->tree_list);
-               btrfs_delayed_item_release_metadata(root, curr);
                btrfs_release_delayed_item(curr);
        }
 out:
@@ -789,62 +864,75 @@ static int btrfs_batch_delete_items(struct btrfs_trans_handle *trans,
                                    struct btrfs_path *path,
                                    struct btrfs_delayed_item *item)
 {
+       struct btrfs_fs_info *fs_info = root->fs_info;
        struct btrfs_delayed_item *curr, *next;
-       struct extent_buffer *leaf;
-       struct btrfs_key key;
-       struct list_head head;
-       int nitems, i, last_item;
-       int ret = 0;
+       struct extent_buffer *leaf = path->nodes[0];
+       LIST_HEAD(batch_list);
+       int nitems, slot, last_slot;
+       int ret;
+       u64 total_reserved_size = item->bytes_reserved;
 
-       BUG_ON(!path->nodes[0]);
+       ASSERT(leaf != NULL);
 
-       leaf = path->nodes[0];
+       slot = path->slots[0];
+       last_slot = btrfs_header_nritems(leaf) - 1;
+       /*
+        * Our caller always gives us a path pointing to an existing item, so
+        * this can not happen.
+        */
+       ASSERT(slot <= last_slot);
+       if (WARN_ON(slot > last_slot))
+               return -ENOENT;
 
-       i = path->slots[0];
-       last_item = btrfs_header_nritems(leaf) - 1;
-       if (i > last_item)
-               return -ENOENT; /* FIXME: Is errno suitable? */
+       nitems = 1;
+       curr = item;
+       list_add_tail(&curr->tree_list, &batch_list);
 
-       next = item;
-       INIT_LIST_HEAD(&head);
-       btrfs_item_key_to_cpu(leaf, &key, i);
-       nitems = 0;
        /*
-        * count the number of the dir index items that we can delete in batch
+        * Keep checking if the next delayed item matches the next item in the
+        * leaf - if so, we can add it to the batch of items to delete from the
+        * leaf.
         */
-       while (btrfs_comp_cpu_keys(&next->key, &key) == 0) {
-               list_add_tail(&next->tree_list, &head);
-               nitems++;
+       while (slot < last_slot) {
+               struct btrfs_key key;
 
-               curr = next;
                next = __btrfs_next_delayed_item(curr);
                if (!next)
                        break;
 
-               if (!btrfs_is_continuous_delayed_item(curr, next))
-                       break;
-
-               i++;
-               if (i > last_item)
+               slot++;
+               btrfs_item_key_to_cpu(leaf, &key, slot);
+               if (btrfs_comp_cpu_keys(&next->key, &key) != 0)
                        break;
-               btrfs_item_key_to_cpu(leaf, &key, i);
+               nitems++;
+               curr = next;
+               list_add_tail(&curr->tree_list, &batch_list);
+               total_reserved_size += curr->bytes_reserved;
        }
 
-       if (!nitems)
-               return 0;
-
        ret = btrfs_del_items(trans, root, path, path->slots[0], nitems);
        if (ret)
-               goto out;
+               return ret;
+
+       /* In case of BTRFS_FS_LOG_RECOVERING items won't have reserved space */
+       if (total_reserved_size > 0) {
+               /*
+                * Check btrfs_delayed_item_reserve_metadata() to see why we
+                * don't need to release/reserve qgroup space.
+                */
+               trace_btrfs_space_reservation(fs_info, "delayed_item",
+                                             item->key.objectid, total_reserved_size,
+                                             0);
+               btrfs_block_rsv_release(fs_info, &fs_info->delayed_block_rsv,
+                                       total_reserved_size, NULL);
+       }
 
-       list_for_each_entry_safe(curr, next, &head, tree_list) {
-               btrfs_delayed_item_release_metadata(root, curr);
+       list_for_each_entry_safe(curr, next, &batch_list, tree_list) {
                list_del(&curr->tree_list);
                btrfs_release_delayed_item(curr);
        }
 
-out:
-       return ret;
+       return 0;
 }
 
 static int btrfs_delete_delayed_items(struct btrfs_trans_handle *trans,
@@ -852,43 +940,52 @@ static int btrfs_delete_delayed_items(struct btrfs_trans_handle *trans,
                                      struct btrfs_root *root,
                                      struct btrfs_delayed_node *node)
 {
-       struct btrfs_delayed_item *curr, *prev;
        int ret = 0;
 
-do_again:
-       mutex_lock(&node->mutex);
-       curr = __btrfs_first_delayed_deletion_item(node);
-       if (!curr)
-               goto delete_fail;
+       while (ret == 0) {
+               struct btrfs_delayed_item *item;
+
+               mutex_lock(&node->mutex);
+               item = __btrfs_first_delayed_deletion_item(node);
+               if (!item) {
+                       mutex_unlock(&node->mutex);
+                       break;
+               }
+
+               ret = btrfs_search_slot(trans, root, &item->key, path, -1, 1);
+               if (ret > 0) {
+                       /*
+                        * There's no matching item in the leaf. This means we
+                        * have already deleted this item in a past run of the
+                        * delayed items. We ignore errors when running delayed
+                        * items from an async context, through a work queue job
+                        * running btrfs_async_run_delayed_root(), and don't
+                        * release delayed items that failed to complete. This
+                        * is because we will retry later, and at transaction
+                        * commit time we always run delayed items and will
+                        * then deal with errors if they fail to run again.
+                        *
+                        * So just release delayed items for which we can't find
+                        * an item in the tree, and move to the next item.
+                        */
+                       btrfs_release_path(path);
+                       btrfs_release_delayed_item(item);
+                       ret = 0;
+               } else if (ret == 0) {
+                       ret = btrfs_batch_delete_items(trans, root, path, item);
+                       btrfs_release_path(path);
+               }
 
-       ret = btrfs_search_slot(trans, root, &curr->key, path, -1, 1);
-       if (ret < 0)
-               goto delete_fail;
-       else if (ret > 0) {
                /*
-                * can't find the item which the node points to, so this node
-                * is invalid, just drop it.
+                * We unlock and relock on each iteration, this is to prevent
+                * blocking other tasks for too long while we are being run from
+                * the async context (work queue job). Those tasks are typically
+                * running system calls like creat/mkdir/rename/unlink/etc which
+                * need to add delayed items to this delayed node.
                 */
-               prev = curr;
-               curr = __btrfs_next_delayed_item(prev);
-               btrfs_release_delayed_item(prev);
-               ret = 0;
-               btrfs_release_path(path);
-               if (curr) {
-                       mutex_unlock(&node->mutex);
-                       goto do_again;
-               } else
-                       goto delete_fail;
+               mutex_unlock(&node->mutex);
        }
 
-       btrfs_batch_delete_items(trans, root, path, curr);
-       btrfs_release_path(path);
-       mutex_unlock(&node->mutex);
-       goto do_again;
-
-delete_fail:
-       btrfs_release_path(path);
-       mutex_unlock(&node->mutex);
        return ret;
 }
 
@@ -1347,9 +1444,13 @@ int btrfs_insert_delayed_dir_index(struct btrfs_trans_handle *trans,
                                   struct btrfs_disk_key *disk_key, u8 type,
                                   u64 index)
 {
+       struct btrfs_fs_info *fs_info = trans->fs_info;
+       const unsigned int leaf_data_size = BTRFS_LEAF_DATA_SIZE(fs_info);
        struct btrfs_delayed_node *delayed_node;
        struct btrfs_delayed_item *delayed_item;
        struct btrfs_dir_item *dir_item;
+       bool reserve_leaf_space;
+       u32 data_len;
        int ret;
 
        delayed_node = btrfs_get_or_create_delayed_node(dir);
@@ -1365,6 +1466,7 @@ int btrfs_insert_delayed_dir_index(struct btrfs_trans_handle *trans,
        delayed_item->key.objectid = btrfs_ino(dir);
        delayed_item->key.type = BTRFS_DIR_INDEX_KEY;
        delayed_item->key.offset = index;
+       delayed_item->ins_or_del = BTRFS_DELAYED_INSERTION_ITEM;
 
        dir_item = (struct btrfs_dir_item *)delayed_item->data;
        dir_item->location = *disk_key;
@@ -1374,15 +1476,52 @@ int btrfs_insert_delayed_dir_index(struct btrfs_trans_handle *trans,
        btrfs_set_stack_dir_type(dir_item, type);
        memcpy((char *)(dir_item + 1), name, name_len);
 
-       ret = btrfs_delayed_item_reserve_metadata(trans, dir->root, delayed_item);
-       /*
-        * we have reserved enough space when we start a new transaction,
-        * so reserving metadata failure is impossible
-        */
-       BUG_ON(ret);
+       data_len = delayed_item->data_len + sizeof(struct btrfs_item);
 
        mutex_lock(&delayed_node->mutex);
-       ret = __btrfs_add_delayed_insertion_item(delayed_node, delayed_item);
+
+       if (delayed_node->index_item_leaves == 0 ||
+           delayed_node->curr_index_batch_size + data_len > leaf_data_size) {
+               delayed_node->curr_index_batch_size = data_len;
+               reserve_leaf_space = true;
+       } else {
+               delayed_node->curr_index_batch_size += data_len;
+               reserve_leaf_space = false;
+       }
+
+       if (reserve_leaf_space) {
+               ret = btrfs_delayed_item_reserve_metadata(trans, dir->root,
+                                                         delayed_item);
+               /*
+                * Space was reserved for a dir index item insertion when we
+                * started the transaction, so getting a failure here should be
+                * impossible.
+                */
+               if (WARN_ON(ret)) {
+                       mutex_unlock(&delayed_node->mutex);
+                       btrfs_release_delayed_item(delayed_item);
+                       goto release_node;
+               }
+
+               delayed_node->index_item_leaves++;
+       } else if (!test_bit(BTRFS_FS_LOG_RECOVERING, &fs_info->flags)) {
+               const u64 bytes = btrfs_calc_insert_metadata_size(fs_info, 1);
+
+               /*
+                * Adding the new dir index item does not require touching another
+                * leaf, so we can release 1 unit of metadata that was previously
+                * reserved when starting the transaction. This applies only to
+                * the case where we had a transaction start and excludes the
+                * transaction join case (when replaying log trees).
+                */
+               trace_btrfs_space_reservation(fs_info, "transaction",
+                                             trans->transid, bytes, 0);
+               btrfs_block_rsv_release(fs_info, trans->block_rsv, bytes, NULL);
+               ASSERT(trans->bytes_reserved >= bytes);
+               trans->bytes_reserved -= bytes;
+       }
+
+       ret = __btrfs_add_delayed_item(delayed_node, delayed_item);
        if (unlikely(ret)) {
                btrfs_err(trans->fs_info,
                          "err add delayed dir index item(name: %.*s) into the insertion tree of the delayed node(root id: %llu, inode id: %llu, errno: %d)",
@@ -1410,8 +1549,37 @@ static int btrfs_delete_delayed_insertion_item(struct btrfs_fs_info *fs_info,
                return 1;
        }
 
-       btrfs_delayed_item_release_metadata(node->root, item);
+       /*
+        * For delayed items to insert, we track reserved metadata bytes based
+        * on the number of leaves that we will use.
+        * See btrfs_insert_delayed_dir_index() and
+        * btrfs_delayed_item_reserve_metadata()).
+        */
+       ASSERT(item->bytes_reserved == 0);
+       ASSERT(node->index_item_leaves > 0);
+
+       /*
+        * If there's only one leaf reserved, we can decrement this item from the
+        * current batch, otherwise we can not because we don't know which leaf
+        * it belongs to. With the current limit on delayed items, we rarely
+        * accumulate enough dir index items to fill more than one leaf (even
+        * when using a leaf size of 4K).
+        */
+       if (node->index_item_leaves == 1) {
+               const u32 data_len = item->data_len + sizeof(struct btrfs_item);
+
+               ASSERT(node->curr_index_batch_size >= data_len);
+               node->curr_index_batch_size -= data_len;
+       }
+
        btrfs_release_delayed_item(item);
+
+       /* If we now have no more dir index items, we can release all leaves. */
+       if (RB_EMPTY_ROOT(&node->ins_root.rb_root)) {
+               btrfs_delayed_item_release_leaves(node, node->index_item_leaves);
+               node->index_item_leaves = 0;
+       }
+
        mutex_unlock(&node->mutex);
        return 0;
 }
@@ -1444,6 +1612,7 @@ int btrfs_delete_delayed_dir_index(struct btrfs_trans_handle *trans,
        }
 
        item->key = item_key;
+       item->ins_or_del = BTRFS_DELAYED_DELETION_ITEM;
 
        ret = btrfs_delayed_item_reserve_metadata(trans, dir->root, item);
        /*
@@ -1458,7 +1627,7 @@ int btrfs_delete_delayed_dir_index(struct btrfs_trans_handle *trans,
        }
 
        mutex_lock(&node->mutex);
-       ret = __btrfs_add_delayed_deletion_item(node, item);
+       ret = __btrfs_add_delayed_item(node, item);
        if (unlikely(ret)) {
                btrfs_err(trans->fs_info,
                          "err add delayed dir index item(index: %llu) into the deletion tree of the delayed node(root id: %llu, inode id: %llu, errno: %d)",
@@ -1826,12 +1995,17 @@ static void __btrfs_kill_delayed_node(struct btrfs_delayed_node *delayed_node)
        mutex_lock(&delayed_node->mutex);
        curr_item = __btrfs_first_delayed_insertion_item(delayed_node);
        while (curr_item) {
-               btrfs_delayed_item_release_metadata(root, curr_item);
                prev_item = curr_item;
                curr_item = __btrfs_next_delayed_item(prev_item);
                btrfs_release_delayed_item(prev_item);
        }
 
+       if (delayed_node->index_item_leaves > 0) {
+               btrfs_delayed_item_release_leaves(delayed_node,
+                                         delayed_node->index_item_leaves);
+               delayed_node->index_item_leaves = 0;
+       }
+
        curr_item = __btrfs_first_delayed_deletion_item(delayed_node);
        while (curr_item) {
                btrfs_delayed_item_release_metadata(root, curr_item);
@@ -1863,35 +2037,34 @@ void btrfs_kill_delayed_inode_items(struct btrfs_inode *inode)
 
 void btrfs_kill_all_delayed_nodes(struct btrfs_root *root)
 {
-       unsigned long index = 0;
-       struct btrfs_delayed_node *delayed_node;
+       u64 inode_id = 0;
        struct btrfs_delayed_node *delayed_nodes[8];
+       int i, n;
 
        while (1) {
-               int n = 0;
-
                spin_lock(&root->inode_lock);
-               if (xa_empty(&root->delayed_nodes)) {
+               n = radix_tree_gang_lookup(&root->delayed_nodes_tree,
+                                          (void **)delayed_nodes, inode_id,
+                                          ARRAY_SIZE(delayed_nodes));
+               if (!n) {
                        spin_unlock(&root->inode_lock);
-                       return;
+                       break;
                }
 
-               xa_for_each_start(&root->delayed_nodes, index, delayed_node, index) {
+               inode_id = delayed_nodes[n - 1]->inode_id + 1;
+               for (i = 0; i < n; i++) {
                        /*
                         * Don't increase refs in case the node is dead and
                         * about to be removed from the tree in the loop below
                         */
-                       if (refcount_inc_not_zero(&delayed_node->refs)) {
-                               delayed_nodes[n] = delayed_node;
-                               n++;
-                       }
-                       if (n >= ARRAY_SIZE(delayed_nodes))
-                               break;
+                       if (!refcount_inc_not_zero(&delayed_nodes[i]->refs))
+                               delayed_nodes[i] = NULL;
                }
-               index++;
                spin_unlock(&root->inode_lock);
 
-               for (int i = 0; i < n; i++) {
+               for (i = 0; i < n; i++) {
+                       if (!delayed_nodes[i])
+                               continue;
                        __btrfs_kill_delayed_node(delayed_nodes[i]);
                        btrfs_release_delayed_node(delayed_nodes[i]);
                }