sunrpc: split cache_detail queue into request and reader lists

Replace the single interleaved queue (which mixed cache_request and
cache_reader entries distinguished by a ->reader flag) with two
dedicated lists: cd->requests for upcall requests and cd->readers
for open file handles.

Readers now track their position via a monotonically increasing
sequence number (next_seqno) rather than by their position in the
shared list. Each cache_request is assigned a seqno when enqueued,
and a new cache_next_request() helper finds the next request at or
after a given seqno.

This eliminates the cache_queue wrapper struct entirely, simplifies
the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
cache_release, and makes the data flow easier to reason about.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
This commit is contained in:
Jeff Layton
2026-02-23 12:10:01 -05:00
committed by Chuck Lever
parent 552d0e17ea
commit facc4e3c80
2 changed files with 62 additions and 85 deletions

View File

@@ -113,9 +113,11 @@ struct cache_detail {
int entries; int entries;
/* fields for communication over channel */ /* fields for communication over channel */
struct list_head queue; struct list_head requests;
struct list_head readers;
spinlock_t queue_lock; spinlock_t queue_lock;
wait_queue_head_t queue_wait; wait_queue_head_t queue_wait;
u64 next_seqno;
atomic_t writers; /* how many time is /channel open */ atomic_t writers; /* how many time is /channel open */
time64_t last_close; /* if no writers, when did last close */ time64_t last_close; /* if no writers, when did last close */

View File

@@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
void sunrpc_init_cache_detail(struct cache_detail *cd) void sunrpc_init_cache_detail(struct cache_detail *cd)
{ {
spin_lock_init(&cd->hash_lock); spin_lock_init(&cd->hash_lock);
INIT_LIST_HEAD(&cd->queue); INIT_LIST_HEAD(&cd->requests);
INIT_LIST_HEAD(&cd->readers);
spin_lock_init(&cd->queue_lock); spin_lock_init(&cd->queue_lock);
init_waitqueue_head(&cd->queue_wait); init_waitqueue_head(&cd->queue_wait);
cd->next_seqno = 0;
spin_lock(&cache_list_lock); spin_lock(&cache_list_lock);
cd->nextcheck = 0; cd->nextcheck = 0;
cd->entries = 0; cd->entries = 0;
@@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
* On read, you get a full request, or block. * On read, you get a full request, or block.
* On write, an update request is processed. * On write, an update request is processed.
* Poll works if anything to read, and always allows write. * Poll works if anything to read, and always allows write.
*
* Implemented by linked list of requests. Each open file has
* a ->private that also exists in this list. New requests are added
* to the end and may wakeup and preceding readers.
* New readers are added to the head. If, on read, an item is found with
* CACHE_UPCALLING clear, we free it from the list.
*
*/ */
struct cache_queue {
struct list_head list;
int reader; /* if 0, then request */
};
struct cache_request { struct cache_request {
struct cache_queue q; struct list_head list;
struct cache_head *item; struct cache_head *item;
char * buf; char *buf;
int len; int len;
int readers; int readers;
u64 seqno;
}; };
struct cache_reader { struct cache_reader {
struct cache_queue q; struct list_head list;
int offset; /* if non-0, we have a refcnt on next request */ int offset; /* if non-0, we have a refcnt on next request */
u64 next_seqno;
}; };
static int cache_request(struct cache_detail *detail, static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
return PAGE_SIZE - len; return PAGE_SIZE - len;
} }
static struct cache_request *
cache_next_request(struct cache_detail *cd, u64 seqno)
{
struct cache_request *rq;
list_for_each_entry(rq, &cd->requests, list)
if (rq->seqno >= seqno)
return rq;
return NULL;
}
static ssize_t cache_read(struct file *filp, char __user *buf, size_t count, static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
loff_t *ppos, struct cache_detail *cd) loff_t *ppos, struct cache_detail *cd)
{ {
@@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
again: again:
spin_lock(&cd->queue_lock); spin_lock(&cd->queue_lock);
/* need to find next request */ /* need to find next request */
while (rp->q.list.next != &cd->queue && rq = cache_next_request(cd, rp->next_seqno);
list_entry(rp->q.list.next, struct cache_queue, list) if (!rq) {
->reader) {
struct list_head *next = rp->q.list.next;
list_move(&rp->q.list, next);
}
if (rp->q.list.next == &cd->queue) {
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
inode_unlock(inode); inode_unlock(inode);
WARN_ON_ONCE(rp->offset); WARN_ON_ONCE(rp->offset);
return 0; return 0;
} }
rq = container_of(rp->q.list.next, struct cache_request, q.list);
WARN_ON_ONCE(rq->q.reader);
if (rp->offset == 0) if (rp->offset == 0)
rq->readers++; rq->readers++;
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
@@ -876,9 +873,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) { if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
err = -EAGAIN; err = -EAGAIN;
spin_lock(&cd->queue_lock); rp->next_seqno = rq->seqno + 1;
list_move(&rp->q.list, &rq->q.list);
spin_unlock(&cd->queue_lock);
} else { } else {
if (rp->offset + count > rq->len) if (rp->offset + count > rq->len)
count = rq->len - rp->offset; count = rq->len - rp->offset;
@@ -888,9 +883,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
rp->offset += count; rp->offset += count;
if (rp->offset >= rq->len) { if (rp->offset >= rq->len) {
rp->offset = 0; rp->offset = 0;
spin_lock(&cd->queue_lock); rp->next_seqno = rq->seqno + 1;
list_move(&rp->q.list, &rq->q.list);
spin_unlock(&cd->queue_lock);
} }
err = 0; err = 0;
} }
@@ -901,7 +894,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
rq->readers--; rq->readers--;
if (rq->readers == 0 && if (rq->readers == 0 &&
!test_bit(CACHE_PENDING, &rq->item->flags)) { !test_bit(CACHE_PENDING, &rq->item->flags)) {
list_del(&rq->q.list); list_del(&rq->list);
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
cache_put(rq->item, cd); cache_put(rq->item, cd);
kfree(rq->buf); kfree(rq->buf);
@@ -976,7 +969,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
{ {
__poll_t mask; __poll_t mask;
struct cache_reader *rp = filp->private_data; struct cache_reader *rp = filp->private_data;
struct cache_queue *cq;
poll_wait(filp, &cd->queue_wait, wait); poll_wait(filp, &cd->queue_wait, wait);
@@ -988,12 +980,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
spin_lock(&cd->queue_lock); spin_lock(&cd->queue_lock);
for (cq= &rp->q; &cq->list != &cd->queue; if (cache_next_request(cd, rp->next_seqno))
cq = list_entry(cq->list.next, struct cache_queue, list)) mask |= EPOLLIN | EPOLLRDNORM;
if (!cq->reader) {
mask |= EPOLLIN | EPOLLRDNORM;
break;
}
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
return mask; return mask;
} }
@@ -1004,7 +992,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
{ {
int len = 0; int len = 0;
struct cache_reader *rp = filp->private_data; struct cache_reader *rp = filp->private_data;
struct cache_queue *cq; struct cache_request *rq;
if (cmd != FIONREAD || !rp) if (cmd != FIONREAD || !rp)
return -EINVAL; return -EINVAL;
@@ -1014,14 +1002,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
/* only find the length remaining in current request, /* only find the length remaining in current request,
* or the length of the next request * or the length of the next request
*/ */
for (cq= &rp->q; &cq->list != &cd->queue; rq = cache_next_request(cd, rp->next_seqno);
cq = list_entry(cq->list.next, struct cache_queue, list)) if (rq)
if (!cq->reader) { len = rq->len - rp->offset;
struct cache_request *cr =
container_of(cq, struct cache_request, q);
len = cr->len - rp->offset;
break;
}
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
return put_user(len, (int __user *)arg); return put_user(len, (int __user *)arg);
@@ -1042,10 +1025,10 @@ static int cache_open(struct inode *inode, struct file *filp,
return -ENOMEM; return -ENOMEM;
} }
rp->offset = 0; rp->offset = 0;
rp->q.reader = 1; rp->next_seqno = 0;
spin_lock(&cd->queue_lock); spin_lock(&cd->queue_lock);
list_add(&rp->q.list, &cd->queue); list_add(&rp->list, &cd->readers);
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
} }
if (filp->f_mode & FMODE_WRITE) if (filp->f_mode & FMODE_WRITE)
@@ -1064,26 +1047,21 @@ static int cache_release(struct inode *inode, struct file *filp,
spin_lock(&cd->queue_lock); spin_lock(&cd->queue_lock);
if (rp->offset) { if (rp->offset) {
struct cache_queue *cq; struct cache_request *cr;
for (cq = &rp->q; &cq->list != &cd->queue;
cq = list_entry(cq->list.next, cr = cache_next_request(cd, rp->next_seqno);
struct cache_queue, list)) if (cr) {
if (!cq->reader) { cr->readers--;
struct cache_request *cr = if (cr->readers == 0 &&
container_of(cq, !test_bit(CACHE_PENDING,
struct cache_request, q); &cr->item->flags)) {
cr->readers--; list_del(&cr->list);
if (cr->readers == 0 && rq = cr;
!test_bit(CACHE_PENDING,
&cr->item->flags)) {
list_del(&cr->q.list);
rq = cr;
}
break;
} }
}
rp->offset = 0; rp->offset = 0;
} }
list_del(&rp->q.list); list_del(&rp->list);
spin_unlock(&cd->queue_lock); spin_unlock(&cd->queue_lock);
if (rq) { if (rq) {
@@ -1107,27 +1085,24 @@ static int cache_release(struct inode *inode, struct file *filp,
static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch) static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
{ {
struct cache_queue *cq, *tmp; struct cache_request *cr, *tmp;
struct cache_request *cr;
LIST_HEAD(dequeued); LIST_HEAD(dequeued);
spin_lock(&detail->queue_lock); spin_lock(&detail->queue_lock);
list_for_each_entry_safe(cq, tmp, &detail->queue, list) list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
if (!cq->reader) { if (cr->item != ch)
cr = container_of(cq, struct cache_request, q); continue;
if (cr->item != ch) if (test_bit(CACHE_PENDING, &ch->flags))
continue; /* Lost a race and it is pending again */
if (test_bit(CACHE_PENDING, &ch->flags)) break;
/* Lost a race and it is pending again */ if (cr->readers != 0)
break; continue;
if (cr->readers != 0) list_move(&cr->list, &dequeued);
continue; }
list_move(&cr->q.list, &dequeued);
}
spin_unlock(&detail->queue_lock); spin_unlock(&detail->queue_lock);
while (!list_empty(&dequeued)) { while (!list_empty(&dequeued)) {
cr = list_entry(dequeued.next, struct cache_request, q.list); cr = list_entry(dequeued.next, struct cache_request, list);
list_del(&cr->q.list); list_del(&cr->list);
cache_put(cr->item, detail); cache_put(cr->item, detail);
kfree(cr->buf); kfree(cr->buf);
kfree(cr); kfree(cr);
@@ -1245,14 +1220,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
return -EAGAIN; return -EAGAIN;
} }
crq->q.reader = 0;
crq->buf = buf; crq->buf = buf;
crq->len = 0; crq->len = 0;
crq->readers = 0; crq->readers = 0;
spin_lock(&detail->queue_lock); spin_lock(&detail->queue_lock);
if (test_bit(CACHE_PENDING, &h->flags)) { if (test_bit(CACHE_PENDING, &h->flags)) {
crq->item = cache_get(h); crq->item = cache_get(h);
list_add_tail(&crq->q.list, &detail->queue); crq->seqno = detail->next_seqno++;
list_add_tail(&crq->list, &detail->requests);
trace_cache_entry_upcall(detail, h); trace_cache_entry_upcall(detail, h);
} else } else
/* Lost a race, no longer PENDING, so don't enqueue */ /* Lost a race, no longer PENDING, so don't enqueue */