void sunrpc_init_cache_detail(struct cache_detail *cd)
{
spin_lock_init(&cd->hash_lock);
- INIT_LIST_HEAD(&cd->queue);
+ INIT_LIST_HEAD(&cd->requests);
+ INIT_LIST_HEAD(&cd->readers);
spin_lock_init(&cd->queue_lock);
init_waitqueue_head(&cd->queue_wait);
+ cd->next_seqno = 0;
spin_lock(&cache_list_lock);
cd->nextcheck = 0;
cd->entries = 0;
* On read, you get a full request, or block.
* On write, an update request is processed.
* Poll works if anything to read, and always allows write.
- *
- * Implemented by linked list of requests. Each open file has
- * a ->private that also exists in this list. New requests are added
- * to the end and may wakeup and preceding readers.
- * New readers are added to the head. If, on read, an item is found with
- * CACHE_UPCALLING clear, we free it from the list.
- *
*/
-struct cache_queue {
- struct list_head list;
- int reader; /* if 0, then request */
-};
struct cache_request {
- struct cache_queue q;
+ struct list_head list;
struct cache_head *item;
- char * buf;
+ char *buf;
int len;
int readers;
+ u64 seqno;
};
struct cache_reader {
- struct cache_queue q;
+ struct list_head list;
int offset; /* if non-0, we have a refcnt on next request */
+ u64 next_seqno;
};
static int cache_request(struct cache_detail *detail,
return PAGE_SIZE - len;
}
+static struct cache_request *
+cache_next_request(struct cache_detail *cd, u64 seqno)
+{
+ struct cache_request *rq;
+
+ list_for_each_entry(rq, &cd->requests, list)
+ if (rq->seqno >= seqno)
+ return rq;
+ return NULL;
+}
+
static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
loff_t *ppos, struct cache_detail *cd)
{
again:
spin_lock(&cd->queue_lock);
/* need to find next request */
- while (rp->q.list.next != &cd->queue &&
- list_entry(rp->q.list.next, struct cache_queue, list)
- ->reader) {
- struct list_head *next = rp->q.list.next;
- list_move(&rp->q.list, next);
- }
- if (rp->q.list.next == &cd->queue) {
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (!rq) {
spin_unlock(&cd->queue_lock);
inode_unlock(inode);
WARN_ON_ONCE(rp->offset);
return 0;
}
- rq = container_of(rp->q.list.next, struct cache_request, q.list);
- WARN_ON_ONCE(rq->q.reader);
if (rp->offset == 0)
rq->readers++;
spin_unlock(&cd->queue_lock);
if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
err = -EAGAIN;
- spin_lock(&cd->queue_lock);
- list_move(&rp->q.list, &rq->q.list);
- spin_unlock(&cd->queue_lock);
+ rp->next_seqno = rq->seqno + 1;
} else {
if (rp->offset + count > rq->len)
count = rq->len - rp->offset;
rp->offset += count;
if (rp->offset >= rq->len) {
rp->offset = 0;
- spin_lock(&cd->queue_lock);
- list_move(&rp->q.list, &rq->q.list);
- spin_unlock(&cd->queue_lock);
+ rp->next_seqno = rq->seqno + 1;
}
err = 0;
}
rq->readers--;
if (rq->readers == 0 &&
!test_bit(CACHE_PENDING, &rq->item->flags)) {
- list_del(&rq->q.list);
+ list_del(&rq->list);
spin_unlock(&cd->queue_lock);
cache_put(rq->item, cd);
kfree(rq->buf);
{
__poll_t mask;
struct cache_reader *rp = filp->private_data;
- struct cache_queue *cq;
poll_wait(filp, &cd->queue_wait, wait);
spin_lock(&cd->queue_lock);
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- mask |= EPOLLIN | EPOLLRDNORM;
- break;
- }
+ if (cache_next_request(cd, rp->next_seqno))
+ mask |= EPOLLIN | EPOLLRDNORM;
spin_unlock(&cd->queue_lock);
return mask;
}
{
int len = 0;
struct cache_reader *rp = filp->private_data;
- struct cache_queue *cq;
+ struct cache_request *rq;
if (cmd != FIONREAD || !rp)
return -EINVAL;
/* only find the length remaining in current request,
* or the length of the next request
*/
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- struct cache_request *cr =
- container_of(cq, struct cache_request, q);
- len = cr->len - rp->offset;
- break;
- }
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (rq)
+ len = rq->len - rp->offset;
spin_unlock(&cd->queue_lock);
return put_user(len, (int __user *)arg);
return -ENOMEM;
}
rp->offset = 0;
- rp->q.reader = 1;
+ rp->next_seqno = 0;
spin_lock(&cd->queue_lock);
- list_add(&rp->q.list, &cd->queue);
+ list_add(&rp->list, &cd->readers);
spin_unlock(&cd->queue_lock);
}
if (filp->f_mode & FMODE_WRITE)
spin_lock(&cd->queue_lock);
if (rp->offset) {
- struct cache_queue *cq;
- for (cq = &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next,
- struct cache_queue, list))
- if (!cq->reader) {
- struct cache_request *cr =
- container_of(cq,
- struct cache_request, q);
- cr->readers--;
- if (cr->readers == 0 &&
- !test_bit(CACHE_PENDING,
- &cr->item->flags)) {
- list_del(&cr->q.list);
- rq = cr;
- }
- break;
+ struct cache_request *cr;
+
+ cr = cache_next_request(cd, rp->next_seqno);
+ if (cr) {
+ cr->readers--;
+ if (cr->readers == 0 &&
+ !test_bit(CACHE_PENDING,
+ &cr->item->flags)) {
+ list_del(&cr->list);
+ rq = cr;
}
+ }
rp->offset = 0;
}
- list_del(&rp->q.list);
+ list_del(&rp->list);
spin_unlock(&cd->queue_lock);
if (rq) {
static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
{
- struct cache_queue *cq, *tmp;
- struct cache_request *cr;
+ struct cache_request *cr, *tmp;
LIST_HEAD(dequeued);
spin_lock(&detail->queue_lock);
- list_for_each_entry_safe(cq, tmp, &detail->queue, list)
- if (!cq->reader) {
- cr = container_of(cq, struct cache_request, q);
- if (cr->item != ch)
- continue;
- if (test_bit(CACHE_PENDING, &ch->flags))
- /* Lost a race and it is pending again */
- break;
- if (cr->readers != 0)
- continue;
- list_move(&cr->q.list, &dequeued);
- }
+ list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
+ if (cr->item != ch)
+ continue;
+ if (test_bit(CACHE_PENDING, &ch->flags))
+ /* Lost a race and it is pending again */
+ break;
+ if (cr->readers != 0)
+ continue;
+ list_move(&cr->list, &dequeued);
+ }
spin_unlock(&detail->queue_lock);
while (!list_empty(&dequeued)) {
- cr = list_entry(dequeued.next, struct cache_request, q.list);
- list_del(&cr->q.list);
+ cr = list_entry(dequeued.next, struct cache_request, list);
+ list_del(&cr->list);
cache_put(cr->item, detail);
kfree(cr->buf);
kfree(cr);
return -EAGAIN;
}
- crq->q.reader = 0;
crq->buf = buf;
crq->len = 0;
crq->readers = 0;
spin_lock(&detail->queue_lock);
if (test_bit(CACHE_PENDING, &h->flags)) {
crq->item = cache_get(h);
- list_add_tail(&crq->q.list, &detail->queue);
+ crq->seqno = detail->next_seqno++;
+ list_add_tail(&crq->list, &detail->requests);
trace_cache_entry_upcall(detail, h);
} else
/* Lost a race, no longer PENDING, so don't enqueue */