]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
sunrpc: convert queue_lock from global spinlock to per-cache-detail lock
authorJeff Layton <jlayton@kernel.org>
Mon, 23 Feb 2026 17:09:59 +0000 (12:09 -0500)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 30 Mar 2026 01:25:09 +0000 (21:25 -0400)
The global queue_lock serializes all upcall queue operations across
every cache_detail instance. Convert it to a per-cache-detail spinlock
so that different caches (e.g. auth.unix.ip vs nfsd.fh) no longer
contend with each other on queue operations.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/cache.h
net/sunrpc/cache.c

index e783132e481ff2593fdc5d323f7b3a08f85d4cd8..3d32dd1f7b05d35562d2064fed69877b3950fb51 100644 (file)
@@ -113,6 +113,7 @@ struct cache_detail {
 
        /* fields for communication over channel */
        struct list_head        queue;
+       spinlock_t              queue_lock;
 
        atomic_t                writers;                /* how many time is /channel open */
        time64_t                last_close;             /* if no writers, when did last close */
index 86b3fd5a429d77f7f917f398a02cb7a5ff8dd1e0..1cfaae488c6c67a9797511804e4bbba16bcc70ae 100644 (file)
@@ -400,6 +400,7 @@ void sunrpc_init_cache_detail(struct cache_detail *cd)
 {
        spin_lock_init(&cd->hash_lock);
        INIT_LIST_HEAD(&cd->queue);
+       spin_lock_init(&cd->queue_lock);
        spin_lock(&cache_list_lock);
        cd->nextcheck = 0;
        cd->entries = 0;
@@ -803,8 +804,6 @@ void cache_clean_deferred(void *owner)
  *
  */
 
-static DEFINE_SPINLOCK(queue_lock);
-
 struct cache_queue {
        struct list_head        list;
        int                     reader; /* if 0, then request */
@@ -847,7 +846,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
        inode_lock(inode); /* protect against multiple concurrent
                              * readers on this file */
  again:
-       spin_lock(&queue_lock);
+       spin_lock(&cd->queue_lock);
        /* need to find next request */
        while (rp->q.list.next != &cd->queue &&
               list_entry(rp->q.list.next, struct cache_queue, list)
@@ -856,7 +855,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
                list_move(&rp->q.list, next);
        }
        if (rp->q.list.next == &cd->queue) {
-               spin_unlock(&queue_lock);
+               spin_unlock(&cd->queue_lock);
                inode_unlock(inode);
                WARN_ON_ONCE(rp->offset);
                return 0;
@@ -865,7 +864,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
        WARN_ON_ONCE(rq->q.reader);
        if (rp->offset == 0)
                rq->readers++;
-       spin_unlock(&queue_lock);
+       spin_unlock(&cd->queue_lock);
 
        if (rq->len == 0) {
                err = cache_request(cd, rq);
@@ -876,9 +875,9 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 
        if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
                err = -EAGAIN;
-               spin_lock(&queue_lock);
+               spin_lock(&cd->queue_lock);
                list_move(&rp->q.list, &rq->q.list);
-               spin_unlock(&queue_lock);
+               spin_unlock(&cd->queue_lock);
        } else {
                if (rp->offset + count > rq->len)
                        count = rq->len - rp->offset;
@@ -888,26 +887,26 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
                rp->offset += count;
                if (rp->offset >= rq->len) {
                        rp->offset = 0;
-                       spin_lock(&queue_lock);
+                       spin_lock(&cd->queue_lock);
                        list_move(&rp->q.list, &rq->q.list);
-                       spin_unlock(&queue_lock);
+                       spin_unlock(&cd->queue_lock);
                }
                err = 0;
        }
  out:
        if (rp->offset == 0) {
                /* need to release rq */
-               spin_lock(&queue_lock);
+               spin_lock(&cd->queue_lock);
                rq->readers--;
                if (rq->readers == 0 &&
                    !test_bit(CACHE_PENDING, &rq->item->flags)) {
                        list_del(&rq->q.list);
-                       spin_unlock(&queue_lock);
+                       spin_unlock(&cd->queue_lock);
                        cache_put(rq->item, cd);
                        kfree(rq->buf);
                        kfree(rq);
                } else
-                       spin_unlock(&queue_lock);
+                       spin_unlock(&cd->queue_lock);
        }
        if (err == -EAGAIN)
                goto again;
@@ -988,7 +987,7 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
        if (!rp)
                return mask;
 
-       spin_lock(&queue_lock);
+       spin_lock(&cd->queue_lock);
 
        for (cq= &rp->q; &cq->list != &cd->queue;
             cq = list_entry(cq->list.next, struct cache_queue, list))
@@ -996,7 +995,7 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
                        mask |= EPOLLIN | EPOLLRDNORM;
                        break;
                }
-       spin_unlock(&queue_lock);
+       spin_unlock(&cd->queue_lock);
        return mask;
 }
 
@@ -1011,7 +1010,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
        if (cmd != FIONREAD || !rp)
                return -EINVAL;
 
-       spin_lock(&queue_lock);
+       spin_lock(&cd->queue_lock);
 
        /* only find the length remaining in current request,
         * or the length of the next request
@@ -1024,7 +1023,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
                        len = cr->len - rp->offset;
                        break;
                }
-       spin_unlock(&queue_lock);
+       spin_unlock(&cd->queue_lock);
 
        return put_user(len, (int __user *)arg);
 }
@@ -1046,9 +1045,9 @@ static int cache_open(struct inode *inode, struct file *filp,
                rp->offset = 0;
                rp->q.reader = 1;
 
-               spin_lock(&queue_lock);
+               spin_lock(&cd->queue_lock);
                list_add(&rp->q.list, &cd->queue);
-               spin_unlock(&queue_lock);
+               spin_unlock(&cd->queue_lock);
        }
        if (filp->f_mode & FMODE_WRITE)
                atomic_inc(&cd->writers);
@@ -1064,7 +1063,7 @@ static int cache_release(struct inode *inode, struct file *filp,
        if (rp) {
                struct cache_request *rq = NULL;
 
-               spin_lock(&queue_lock);
+               spin_lock(&cd->queue_lock);
                if (rp->offset) {
                        struct cache_queue *cq;
                        for (cq = &rp->q; &cq->list != &cd->queue;
@@ -1086,7 +1085,7 @@ static int cache_release(struct inode *inode, struct file *filp,
                        rp->offset = 0;
                }
                list_del(&rp->q.list);
-               spin_unlock(&queue_lock);
+               spin_unlock(&cd->queue_lock);
 
                if (rq) {
                        cache_put(rq->item, cd);
@@ -1113,7 +1112,7 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
        struct cache_request *cr;
        LIST_HEAD(dequeued);
 
-       spin_lock(&queue_lock);
+       spin_lock(&detail->queue_lock);
        list_for_each_entry_safe(cq, tmp, &detail->queue, list)
                if (!cq->reader) {
                        cr = container_of(cq, struct cache_request, q);
@@ -1126,7 +1125,7 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
                                continue;
                        list_move(&cr->q.list, &dequeued);
                }
-       spin_unlock(&queue_lock);
+       spin_unlock(&detail->queue_lock);
        while (!list_empty(&dequeued)) {
                cr = list_entry(dequeued.next, struct cache_request, q.list);
                list_del(&cr->q.list);
@@ -1251,7 +1250,7 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
        crq->buf = buf;
        crq->len = 0;
        crq->readers = 0;
-       spin_lock(&queue_lock);
+       spin_lock(&detail->queue_lock);
        if (test_bit(CACHE_PENDING, &h->flags)) {
                crq->item = cache_get(h);
                list_add_tail(&crq->q.list, &detail->queue);
@@ -1259,7 +1258,7 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
        } else
                /* Lost a race, no longer PENDING, so don't enqueue */
                ret = -EAGAIN;
-       spin_unlock(&queue_lock);
+       spin_unlock(&detail->queue_lock);
        wake_up(&queue_wait);
        if (ret == -EAGAIN) {
                kfree(buf);