]> git.ipfire.org Git - thirdparty/kernel/stable.git/commitdiff
sunrpc: split cache_detail queue into request and reader lists
authorJeff Layton <jlayton@kernel.org>
Mon, 23 Feb 2026 17:10:01 +0000 (12:10 -0500)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 30 Mar 2026 01:25:09 +0000 (21:25 -0400)
Replace the single interleaved queue (which mixed cache_request and
cache_reader entries distinguished by a ->reader flag) with two
dedicated lists: cd->requests for upcall requests and cd->readers
for open file handles.

Readers now track their position via a monotonically increasing
sequence number (next_seqno) rather than by their position in the
shared list. Each cache_request is assigned a seqno when enqueued,
and a new cache_next_request() helper finds the next request at or
after a given seqno.

This eliminates the cache_queue wrapper struct entirely, simplifies
the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
cache_release, and makes the data flow easier to reason about.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/cache.h
net/sunrpc/cache.c

index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644 (file)
@@ -113,9 +113,11 @@ struct cache_detail {
        int                     entries;
 
        /* fields for communication over channel */
-       struct list_head        queue;
+       struct list_head        requests;
+       struct list_head        readers;
        spinlock_t              queue_lock;
        wait_queue_head_t       queue_wait;
+       u64                     next_seqno;
 
        atomic_t                writers;                /* how many time is /channel open */
        time64_t                last_close;             /* if no writers, when did last close */
index fd02dca1f07afec2f09c591037bac3ea3e8d7e17..7081c1214e6c3226f8ac82c8bc7ff6c36f598744 100644 (file)
@@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
 void sunrpc_init_cache_detail(struct cache_detail *cd)
 {
        spin_lock_init(&cd->hash_lock);
-       INIT_LIST_HEAD(&cd->queue);
+       INIT_LIST_HEAD(&cd->requests);
+       INIT_LIST_HEAD(&cd->readers);
        spin_lock_init(&cd->queue_lock);
        init_waitqueue_head(&cd->queue_wait);
+       cd->next_seqno = 0;
        spin_lock(&cache_list_lock);
        cd->nextcheck = 0;
        cd->entries = 0;
@@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
  * On read, you get a full request, or block.
  * On write, an update request is processed.
  * Poll works if anything to read, and always allows write.
- *
- * Implemented by linked list of requests.  Each open file has
- * a ->private that also exists in this list.  New requests are added
- * to the end and may wakeup and preceding readers.
- * New readers are added to the head.  If, on read, an item is found with
- * CACHE_UPCALLING clear, we free it from the list.
- *
  */
 
-struct cache_queue {
-       struct list_head        list;
-       int                     reader; /* if 0, then request */
-};
 struct cache_request {
-       struct cache_queue      q;
+       struct list_head        list;
        struct cache_head       *item;
-       char                    * buf;
+       char                    *buf;
        int                     len;
        int                     readers;
+       u64                     seqno;
 };
 struct cache_reader {
-       struct cache_queue      q;
+       struct list_head        list;
        int                     offset; /* if non-0, we have a refcnt on next request */
+       u64                     next_seqno;
 };
 
 static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
        return PAGE_SIZE - len;
 }
 
+static struct cache_request *
+cache_next_request(struct cache_detail *cd, u64 seqno)
+{
+       struct cache_request *rq;
+
+       list_for_each_entry(rq, &cd->requests, list)
+               if (rq->seqno >= seqno)
+                       return rq;
+       return NULL;
+}
+
 static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
                          loff_t *ppos, struct cache_detail *cd)
 {
@@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
  again:
        spin_lock(&cd->queue_lock);
        /* need to find next request */
-       while (rp->q.list.next != &cd->queue &&
-              list_entry(rp->q.list.next, struct cache_queue, list)
-              ->reader) {
-               struct list_head *next = rp->q.list.next;
-               list_move(&rp->q.list, next);
-       }
-       if (rp->q.list.next == &cd->queue) {
+       rq = cache_next_request(cd, rp->next_seqno);
+       if (!rq) {
                spin_unlock(&cd->queue_lock);
                inode_unlock(inode);
                WARN_ON_ONCE(rp->offset);
                return 0;
        }
-       rq = container_of(rp->q.list.next, struct cache_request, q.list);
-       WARN_ON_ONCE(rq->q.reader);
        if (rp->offset == 0)
                rq->readers++;
        spin_unlock(&cd->queue_lock);
@@ -876,9 +873,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 
        if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
                err = -EAGAIN;
-               spin_lock(&cd->queue_lock);
-               list_move(&rp->q.list, &rq->q.list);
-               spin_unlock(&cd->queue_lock);
+               rp->next_seqno = rq->seqno + 1;
        } else {
                if (rp->offset + count > rq->len)
                        count = rq->len - rp->offset;
@@ -888,9 +883,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
                rp->offset += count;
                if (rp->offset >= rq->len) {
                        rp->offset = 0;
-                       spin_lock(&cd->queue_lock);
-                       list_move(&rp->q.list, &rq->q.list);
-                       spin_unlock(&cd->queue_lock);
+                       rp->next_seqno = rq->seqno + 1;
                }
                err = 0;
        }
@@ -901,7 +894,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
                rq->readers--;
                if (rq->readers == 0 &&
                    !test_bit(CACHE_PENDING, &rq->item->flags)) {
-                       list_del(&rq->q.list);
+                       list_del(&rq->list);
                        spin_unlock(&cd->queue_lock);
                        cache_put(rq->item, cd);
                        kfree(rq->buf);
@@ -976,7 +969,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
 {
        __poll_t mask;
        struct cache_reader *rp = filp->private_data;
-       struct cache_queue *cq;
 
        poll_wait(filp, &cd->queue_wait, wait);
 
@@ -988,12 +980,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
 
        spin_lock(&cd->queue_lock);
 
-       for (cq= &rp->q; &cq->list != &cd->queue;
-            cq = list_entry(cq->list.next, struct cache_queue, list))
-               if (!cq->reader) {
-                       mask |= EPOLLIN | EPOLLRDNORM;
-                       break;
-               }
+       if (cache_next_request(cd, rp->next_seqno))
+               mask |= EPOLLIN | EPOLLRDNORM;
        spin_unlock(&cd->queue_lock);
        return mask;
 }
@@ -1004,7 +992,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
 {
        int len = 0;
        struct cache_reader *rp = filp->private_data;
-       struct cache_queue *cq;
+       struct cache_request *rq;
 
        if (cmd != FIONREAD || !rp)
                return -EINVAL;
@@ -1014,14 +1002,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
        /* only find the length remaining in current request,
         * or the length of the next request
         */
-       for (cq= &rp->q; &cq->list != &cd->queue;
-            cq = list_entry(cq->list.next, struct cache_queue, list))
-               if (!cq->reader) {
-                       struct cache_request *cr =
-                               container_of(cq, struct cache_request, q);
-                       len = cr->len - rp->offset;
-                       break;
-               }
+       rq = cache_next_request(cd, rp->next_seqno);
+       if (rq)
+               len = rq->len - rp->offset;
        spin_unlock(&cd->queue_lock);
 
        return put_user(len, (int __user *)arg);
@@ -1042,10 +1025,10 @@ static int cache_open(struct inode *inode, struct file *filp,
                        return -ENOMEM;
                }
                rp->offset = 0;
-               rp->q.reader = 1;
+               rp->next_seqno = 0;
 
                spin_lock(&cd->queue_lock);
-               list_add(&rp->q.list, &cd->queue);
+               list_add(&rp->list, &cd->readers);
                spin_unlock(&cd->queue_lock);
        }
        if (filp->f_mode & FMODE_WRITE)
@@ -1064,26 +1047,21 @@ static int cache_release(struct inode *inode, struct file *filp,
 
                spin_lock(&cd->queue_lock);
                if (rp->offset) {
-                       struct cache_queue *cq;
-                       for (cq = &rp->q; &cq->list != &cd->queue;
-                            cq = list_entry(cq->list.next,
-                                            struct cache_queue, list))
-                               if (!cq->reader) {
-                                       struct cache_request *cr =
-                                               container_of(cq,
-                                               struct cache_request, q);
-                                       cr->readers--;
-                                       if (cr->readers == 0 &&
-                                           !test_bit(CACHE_PENDING,
-                                                     &cr->item->flags)) {
-                                               list_del(&cr->q.list);
-                                               rq = cr;
-                                       }
-                                       break;
+                       struct cache_request *cr;
+
+                       cr = cache_next_request(cd, rp->next_seqno);
+                       if (cr) {
+                               cr->readers--;
+                               if (cr->readers == 0 &&
+                                   !test_bit(CACHE_PENDING,
+                                             &cr->item->flags)) {
+                                       list_del(&cr->list);
+                                       rq = cr;
                                }
+                       }
                        rp->offset = 0;
                }
-               list_del(&rp->q.list);
+               list_del(&rp->list);
                spin_unlock(&cd->queue_lock);
 
                if (rq) {
@@ -1107,27 +1085,24 @@ static int cache_release(struct inode *inode, struct file *filp,
 
 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
 {
-       struct cache_queue *cq, *tmp;
-       struct cache_request *cr;
+       struct cache_request *cr, *tmp;
        LIST_HEAD(dequeued);
 
        spin_lock(&detail->queue_lock);
-       list_for_each_entry_safe(cq, tmp, &detail->queue, list)
-               if (!cq->reader) {
-                       cr = container_of(cq, struct cache_request, q);
-                       if (cr->item != ch)
-                               continue;
-                       if (test_bit(CACHE_PENDING, &ch->flags))
-                               /* Lost a race and it is pending again */
-                               break;
-                       if (cr->readers != 0)
-                               continue;
-                       list_move(&cr->q.list, &dequeued);
-               }
+       list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
+               if (cr->item != ch)
+                       continue;
+               if (test_bit(CACHE_PENDING, &ch->flags))
+                       /* Lost a race and it is pending again */
+                       break;
+               if (cr->readers != 0)
+                       continue;
+               list_move(&cr->list, &dequeued);
+       }
        spin_unlock(&detail->queue_lock);
        while (!list_empty(&dequeued)) {
-               cr = list_entry(dequeued.next, struct cache_request, q.list);
-               list_del(&cr->q.list);
+               cr = list_entry(dequeued.next, struct cache_request, list);
+               list_del(&cr->list);
                cache_put(cr->item, detail);
                kfree(cr->buf);
                kfree(cr);
@@ -1245,14 +1220,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
                return -EAGAIN;
        }
 
-       crq->q.reader = 0;
        crq->buf = buf;
        crq->len = 0;
        crq->readers = 0;
        spin_lock(&detail->queue_lock);
        if (test_bit(CACHE_PENDING, &h->flags)) {
                crq->item = cache_get(h);
-               list_add_tail(&crq->q.list, &detail->queue);
+               crq->seqno = detail->next_seqno++;
+               list_add_tail(&crq->list, &detail->requests);
                trace_cache_entry_upcall(detail, h);
        } else
                /* Lost a race, no longer PENDING, so don't enqueue */