From f08068df4aa411465319bfb796dc6a2228131bc5 Mon Sep 17 00:00:00 2001 From: Viacheslav Dubeyko Date: Tue, 4 Feb 2025 16:02:46 -0800 Subject: [PATCH 1/4] ceph: extend ceph_writeback_ctl for ceph_writepages_start() refactoring The ceph_writepages_start() has unreasonably huge size and complex logic that makes this method hard to understand. Current state of the method's logic makes bug fix really hard task. This patch extends the struct ceph_writeback_ctl with the goal to make ceph_writepages_start() method more compact and easy to understand by means of deep refactoring. Signed-off-by: Viacheslav Dubeyko Link: https://lore.kernel.org/r/20250205000249.123054-2-slava@dubeyko.com Tested-by: David Howells Signed-off-by: Christian Brauner --- fs/ceph/addr.c | 485 ++++++++++++++++++++++++++++++------------------- 1 file changed, 301 insertions(+), 184 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f5224a566b69..d002ff62d867 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -568,7 +568,36 @@ struct ceph_writeback_ctl u64 truncate_size; u32 truncate_seq; bool size_stable; + bool head_snapc; + struct ceph_snap_context *snapc; + struct ceph_snap_context *last_snapc; + + bool done; + bool should_loop; + bool range_whole; + pgoff_t start_index; + pgoff_t index; + pgoff_t end; + xa_mark_t tag; + + pgoff_t strip_unit_end; + unsigned int wsize; + unsigned int nr_folios; + unsigned int max_pages; + unsigned int locked_pages; + + int op_idx; + int num_ops; + u64 offset; + u64 len; + + struct folio_batch fbatch; + unsigned int processed_in_fbatch; + + bool from_pool; + struct page **pages; + struct page **data_pages; }; /* @@ -949,6 +978,74 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_dec_osd_stopping_blocker(fsc->mdsc); } +static inline +unsigned int ceph_define_write_size(struct address_space *mapping) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + unsigned int wsize = i_blocksize(inode); + + if (fsc->mount_options->wsize < wsize) + wsize = fsc->mount_options->wsize; + + return wsize; +} + +static inline +void ceph_folio_batch_init(struct ceph_writeback_ctl *ceph_wbc) +{ + folio_batch_init(&ceph_wbc->fbatch); + ceph_wbc->processed_in_fbatch = 0; +} + +static inline +void ceph_folio_batch_reinit(struct ceph_writeback_ctl *ceph_wbc) +{ + folio_batch_release(&ceph_wbc->fbatch); + ceph_folio_batch_init(ceph_wbc); +} + +static inline +void ceph_init_writeback_ctl(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + ceph_wbc->snapc = NULL; + ceph_wbc->last_snapc = NULL; + + ceph_wbc->strip_unit_end = 0; + ceph_wbc->wsize = ceph_define_write_size(mapping); + + ceph_wbc->nr_folios = 0; + ceph_wbc->max_pages = 0; + ceph_wbc->locked_pages = 0; + + ceph_wbc->done = false; + ceph_wbc->should_loop = false; + ceph_wbc->range_whole = false; + + ceph_wbc->start_index = wbc->range_cyclic ? mapping->writeback_index : 0; + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; + + if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) { + ceph_wbc->tag = PAGECACHE_TAG_TOWRITE; + } else { + ceph_wbc->tag = PAGECACHE_TAG_DIRTY; + } + + ceph_wbc->op_idx = -1; + ceph_wbc->num_ops = 0; + ceph_wbc->offset = 0; + ceph_wbc->len = 0; + ceph_wbc->from_pool = false; + + ceph_folio_batch_init(ceph_wbc); + + ceph_wbc->pages = NULL; + ceph_wbc->data_pages = NULL; +} + /* * initiate async writeback */ @@ -960,17 +1057,11 @@ static int ceph_writepages_start(struct address_space *mapping, struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; struct ceph_vino vino = ceph_vino(inode); - pgoff_t index, start_index, end = -1; - struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; - struct folio_batch fbatch; - int rc = 0; - unsigned int wsize = i_blocksize(inode); - struct ceph_osd_request *req = NULL; + struct ceph_snap_context *pgsnapc; struct ceph_writeback_ctl ceph_wbc; - bool should_loop, range_whole = false; - bool done = false; + struct ceph_osd_request *req = NULL; + int rc = 0; bool caching = ceph_is_cache_enabled(inode); - xa_mark_t tag; if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested) @@ -989,86 +1080,78 @@ static int ceph_writepages_start(struct address_space *mapping, mapping_set_error(mapping, -EIO); return -EIO; /* we're in a forced umount, don't write! */ } - if (fsc->mount_options->wsize < wsize) - wsize = fsc->mount_options->wsize; - folio_batch_init(&fbatch); + ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); - start_index = wbc->range_cyclic ? mapping->writeback_index : 0; - index = start_index; - - if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) { - tag = PAGECACHE_TAG_TOWRITE; - } else { - tag = PAGECACHE_TAG_DIRTY; - } retry: /* find oldest snap context with dirty data */ - snapc = get_oldest_context(inode, &ceph_wbc, NULL); - if (!snapc) { + ceph_wbc.snapc = get_oldest_context(inode, &ceph_wbc, NULL); + if (!ceph_wbc.snapc) { /* hmm, why does writepages get called when there is no dirty data? */ doutc(cl, " no snap context with dirty data?\n"); goto out; } - doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", snapc, - snapc->seq, snapc->num_snaps); + doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", + ceph_wbc.snapc, ceph_wbc.snapc->seq, + ceph_wbc.snapc->num_snaps); - should_loop = false; - if (ceph_wbc.head_snapc && snapc != last_snapc) { + ceph_wbc.should_loop = false; + if (ceph_wbc.head_snapc && ceph_wbc.snapc != ceph_wbc.last_snapc) { /* where to start/end? */ if (wbc->range_cyclic) { - index = start_index; - end = -1; - if (index > 0) - should_loop = true; - doutc(cl, " cyclic, start at %lu\n", index); + ceph_wbc.index = ceph_wbc.start_index; + ceph_wbc.end = -1; + if (ceph_wbc.index > 0) + ceph_wbc.should_loop = true; + doutc(cl, " cyclic, start at %lu\n", ceph_wbc.index); } else { - index = wbc->range_start >> PAGE_SHIFT; - end = wbc->range_end >> PAGE_SHIFT; + ceph_wbc.index = wbc->range_start >> PAGE_SHIFT; + ceph_wbc.end = wbc->range_end >> PAGE_SHIFT; if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) - range_whole = true; - doutc(cl, " not cyclic, %lu to %lu\n", index, end); + ceph_wbc.range_whole = true; + doutc(cl, " not cyclic, %lu to %lu\n", + ceph_wbc.index, ceph_wbc.end); } } else if (!ceph_wbc.head_snapc) { /* Do not respect wbc->range_{start,end}. Dirty pages * in that range can be associated with newer snapc. * They are not writeable until we write all dirty pages * associated with 'snapc' get written */ - if (index > 0) - should_loop = true; + if (ceph_wbc.index > 0) + ceph_wbc.should_loop = true; doutc(cl, " non-head snapc, range whole\n"); } if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) - tag_pages_for_writeback(mapping, index, end); + tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); - ceph_put_snap_context(last_snapc); - last_snapc = snapc; + ceph_put_snap_context(ceph_wbc.last_snapc); + ceph_wbc.last_snapc = ceph_wbc.snapc; - while (!done && index <= end) { - int num_ops = 0, op_idx; - unsigned i, nr_folios, max_pages, locked_pages = 0; - struct page **pages = NULL, **data_pages; + while (!ceph_wbc.done && ceph_wbc.index <= ceph_wbc.end) { + unsigned i; struct page *page; - pgoff_t strip_unit_end = 0; - u64 offset = 0, len = 0; - bool from_pool = false; - max_pages = wsize >> PAGE_SHIFT; + ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; get_more_pages: - nr_folios = filemap_get_folios_tag(mapping, &index, - end, tag, &fbatch); - doutc(cl, "pagevec_lookup_range_tag got %d\n", nr_folios); - if (!nr_folios && !locked_pages) + ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, + &ceph_wbc.index, + ceph_wbc.end, + ceph_wbc.tag, + &ceph_wbc.fbatch); + doutc(cl, "pagevec_lookup_range_tag got %d\n", + ceph_wbc.nr_folios); + if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) break; - for (i = 0; i < nr_folios && locked_pages < max_pages; i++) { - struct folio *folio = fbatch.folios[i]; + for (i = 0; i < ceph_wbc.nr_folios && + ceph_wbc.locked_pages < ceph_wbc.max_pages; i++) { + struct folio *folio = ceph_wbc.fbatch.folios[i]; page = &folio->page; doutc(cl, "? %p idx %lu\n", page, page->index); - if (locked_pages == 0) + if (ceph_wbc.locked_pages == 0) lock_page(page); /* first page */ else if (!trylock_page(page)) break; @@ -1082,13 +1165,14 @@ static int ceph_writepages_start(struct address_space *mapping, } /* only if matching snap context */ pgsnapc = page_snap_context(page); - if (pgsnapc != snapc) { + if (pgsnapc != ceph_wbc.snapc) { doutc(cl, "page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, snapc, snapc->seq); - if (!should_loop && + pgsnapc, pgsnapc->seq, + ceph_wbc.snapc, ceph_wbc.snapc->seq); + if (!ceph_wbc.should_loop && !ceph_wbc.head_snapc && wbc->sync_mode != WB_SYNC_NONE) - should_loop = true; + ceph_wbc.should_loop = true; unlock_page(page); continue; } @@ -1103,7 +1187,8 @@ static int ceph_writepages_start(struct address_space *mapping, folio_unlock(folio); continue; } - if (strip_unit_end && (page->index > strip_unit_end)) { + if (ceph_wbc.strip_unit_end && + (page->index > ceph_wbc.strip_unit_end)) { doutc(cl, "end of strip unit %p\n", page); unlock_page(page); break; @@ -1132,47 +1217,52 @@ static int ceph_writepages_start(struct address_space *mapping, * calculate max possinle write size and * allocate a page array */ - if (locked_pages == 0) { + if (ceph_wbc.locked_pages == 0) { u64 objnum; u64 objoff; u32 xlen; /* prepare async write request */ - offset = (u64)page_offset(page); + ceph_wbc.offset = (u64)page_offset(page); ceph_calc_file_object_mapping(&ci->i_layout, - offset, wsize, + ceph_wbc.offset, + ceph_wbc.wsize, &objnum, &objoff, &xlen); - len = xlen; + ceph_wbc.len = xlen; - num_ops = 1; - strip_unit_end = page->index + - ((len - 1) >> PAGE_SHIFT); + ceph_wbc.num_ops = 1; + ceph_wbc.strip_unit_end = page->index + + ((ceph_wbc.len - 1) >> PAGE_SHIFT); - BUG_ON(pages); - max_pages = calc_pages_for(0, (u64)len); - pages = kmalloc_array(max_pages, - sizeof(*pages), + BUG_ON(ceph_wbc.pages); + ceph_wbc.max_pages = + calc_pages_for(0, (u64)ceph_wbc.len); + ceph_wbc.pages = kmalloc_array(ceph_wbc.max_pages, + sizeof(*ceph_wbc.pages), GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); + if (!ceph_wbc.pages) { + ceph_wbc.from_pool = true; + ceph_wbc.pages = + mempool_alloc(ceph_wb_pagevec_pool, + GFP_NOFS); + BUG_ON(!ceph_wbc.pages); } - len = 0; + ceph_wbc.len = 0; } else if (page->index != - (offset + len) >> PAGE_SHIFT) { - if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS : + (ceph_wbc.offset + ceph_wbc.len) >> PAGE_SHIFT) { + if (ceph_wbc.num_ops >= + (ceph_wbc.from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS)) { redirty_page_for_writepage(wbc, page); unlock_page(page); break; } - num_ops++; - offset = (u64)page_offset(page); - len = 0; + ceph_wbc.num_ops++; + ceph_wbc.offset = (u64)page_offset(page); + ceph_wbc.len = 0; } /* note position of first page in fbatch */ @@ -1185,78 +1275,85 @@ static int ceph_writepages_start(struct address_space *mapping, fsc->write_congested = true; if (IS_ENCRYPTED(inode)) { - pages[locked_pages] = + ceph_wbc.pages[ceph_wbc.locked_pages] = fscrypt_encrypt_pagecache_blocks(page, PAGE_SIZE, 0, - locked_pages ? GFP_NOWAIT : GFP_NOFS); - if (IS_ERR(pages[locked_pages])) { - if (PTR_ERR(pages[locked_pages]) == -EINVAL) + ceph_wbc.locked_pages ? + GFP_NOWAIT : GFP_NOFS); + if (IS_ERR(ceph_wbc.pages[ceph_wbc.locked_pages])) { + if (PTR_ERR(ceph_wbc.pages[ceph_wbc.locked_pages]) == -EINVAL) pr_err_client(cl, "inode->i_blkbits=%hhu\n", inode->i_blkbits); /* better not fail on first page! */ - BUG_ON(locked_pages == 0); - pages[locked_pages] = NULL; + BUG_ON(ceph_wbc.locked_pages == 0); + ceph_wbc.pages[ceph_wbc.locked_pages] = NULL; redirty_page_for_writepage(wbc, page); unlock_page(page); break; } - ++locked_pages; + ++ceph_wbc.locked_pages; } else { - pages[locked_pages++] = page; + ceph_wbc.pages[ceph_wbc.locked_pages++] = page; } - fbatch.folios[i] = NULL; - len += thp_size(page); + ceph_wbc.fbatch.folios[i] = NULL; + ceph_wbc.len += thp_size(page); } /* did we get anything? */ - if (!locked_pages) + if (!ceph_wbc.locked_pages) goto release_folios; if (i) { unsigned j, n = 0; /* shift unused page to beginning of fbatch */ - for (j = 0; j < nr_folios; j++) { - if (!fbatch.folios[j]) + for (j = 0; j < ceph_wbc.nr_folios; j++) { + if (!ceph_wbc.fbatch.folios[j]) continue; - if (n < j) - fbatch.folios[n] = fbatch.folios[j]; + if (n < j) { + ceph_wbc.fbatch.folios[n] = + ceph_wbc.fbatch.folios[j]; + } n++; } - fbatch.nr = n; + ceph_wbc.fbatch.nr = n; - if (nr_folios && i == nr_folios && - locked_pages < max_pages) { + if (ceph_wbc.nr_folios && i == ceph_wbc.nr_folios && + ceph_wbc.locked_pages < ceph_wbc.max_pages) { doutc(cl, "reached end fbatch, trying for more\n"); - folio_batch_release(&fbatch); + folio_batch_release(&ceph_wbc.fbatch); goto get_more_pages; } } new_request: - offset = ceph_fscrypt_page_offset(pages[0]); - len = wsize; + ceph_wbc.offset = ceph_fscrypt_page_offset(ceph_wbc.pages[0]); + ceph_wbc.len = ceph_wbc.wsize; req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, - offset, &len, 0, num_ops, + ceph_wbc.offset, &ceph_wbc.len, + 0, ceph_wbc.num_ops, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, + ceph_wbc.snapc, ceph_wbc.truncate_seq, ceph_wbc.truncate_size, false); if (IS_ERR(req)) { req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, - offset, &len, 0, - min(num_ops, + ceph_wbc.offset, &ceph_wbc.len, + 0, min(ceph_wbc.num_ops, CEPH_OSD_SLAB_OPS), CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, + ceph_wbc.snapc, + ceph_wbc.truncate_seq, ceph_wbc.truncate_size, true); BUG_ON(IS_ERR(req)); } - BUG_ON(len < ceph_fscrypt_page_offset(pages[locked_pages - 1]) + - thp_size(pages[locked_pages - 1]) - offset); + BUG_ON(ceph_wbc.len < + ceph_fscrypt_page_offset(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) + + thp_size(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) - + ceph_wbc.offset); if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { rc = -EIO; @@ -1266,100 +1363,118 @@ static int ceph_writepages_start(struct address_space *mapping, req->r_inode = inode; /* Format the osd request message and submit the write */ - len = 0; - data_pages = pages; - op_idx = 0; - for (i = 0; i < locked_pages; i++) { - struct page *page = ceph_fscrypt_pagecache_page(pages[i]); + ceph_wbc.len = 0; + ceph_wbc.data_pages = ceph_wbc.pages; + ceph_wbc.op_idx = 0; + for (i = 0; i < ceph_wbc.locked_pages; i++) { + struct page *page = + ceph_fscrypt_pagecache_page(ceph_wbc.pages[i]); u64 cur_offset = page_offset(page); /* * Discontinuity in page range? Ceph can handle that by just passing * multiple extents in the write op. */ - if (offset + len != cur_offset) { + if (ceph_wbc.offset + ceph_wbc.len != cur_offset) { /* If it's full, stop here */ - if (op_idx + 1 == req->r_num_ops) + if (ceph_wbc.op_idx + 1 == req->r_num_ops) break; /* Kick off an fscache write with what we have so far. */ - ceph_fscache_write_to_cache(inode, offset, len, caching); + ceph_fscache_write_to_cache(inode, ceph_wbc.offset, + ceph_wbc.len, caching); /* Start a new extent */ - osd_req_op_extent_dup_last(req, op_idx, - cur_offset - offset); - doutc(cl, "got pages at %llu~%llu\n", offset, - len); - osd_req_op_extent_osd_data_pages(req, op_idx, - data_pages, len, 0, - from_pool, false); - osd_req_op_extent_update(req, op_idx, len); + osd_req_op_extent_dup_last(req, ceph_wbc.op_idx, + cur_offset - + ceph_wbc.offset); + doutc(cl, "got pages at %llu~%llu\n", + ceph_wbc.offset, + ceph_wbc.len); + osd_req_op_extent_osd_data_pages(req, + ceph_wbc.op_idx, + ceph_wbc.data_pages, + ceph_wbc.len, 0, + ceph_wbc.from_pool, false); + osd_req_op_extent_update(req, ceph_wbc.op_idx, + ceph_wbc.len); - len = 0; - offset = cur_offset; - data_pages = pages + i; - op_idx++; + ceph_wbc.len = 0; + ceph_wbc.offset = cur_offset; + ceph_wbc.data_pages = ceph_wbc.pages + i; + ceph_wbc.op_idx++; } set_page_writeback(page); if (caching) ceph_set_page_fscache(page); - len += thp_size(page); + ceph_wbc.len += thp_size(page); } - ceph_fscache_write_to_cache(inode, offset, len, caching); + ceph_fscache_write_to_cache(inode, ceph_wbc.offset, + ceph_wbc.len, caching); if (ceph_wbc.size_stable) { - len = min(len, ceph_wbc.i_size - offset); - } else if (i == locked_pages) { + ceph_wbc.len = min(ceph_wbc.len, + ceph_wbc.i_size - ceph_wbc.offset); + } else if (i == ceph_wbc.locked_pages) { /* writepages_finish() clears writeback pages * according to the data length, so make sure * data length covers all locked pages */ - u64 min_len = len + 1 - thp_size(page); - len = get_writepages_data_length(inode, pages[i - 1], - offset); - len = max(len, min_len); + u64 min_len = ceph_wbc.len + 1 - thp_size(page); + ceph_wbc.len = + get_writepages_data_length(inode, + ceph_wbc.pages[i - 1], + ceph_wbc.offset); + ceph_wbc.len = max(ceph_wbc.len, min_len); + } + if (IS_ENCRYPTED(inode)) { + ceph_wbc.len = round_up(ceph_wbc.len, + CEPH_FSCRYPT_BLOCK_SIZE); } - if (IS_ENCRYPTED(inode)) - len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); - doutc(cl, "got pages at %llu~%llu\n", offset, len); + doutc(cl, "got pages at %llu~%llu\n", + ceph_wbc.offset, ceph_wbc.len); if (IS_ENCRYPTED(inode) && - ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) + ((ceph_wbc.offset | ceph_wbc.len) & ~CEPH_FSCRYPT_BLOCK_MASK)) pr_warn_client(cl, "bad encrypted write offset=%lld len=%llu\n", - offset, len); + ceph_wbc.offset, ceph_wbc.len); - osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, - 0, from_pool, false); - osd_req_op_extent_update(req, op_idx, len); + osd_req_op_extent_osd_data_pages(req, ceph_wbc.op_idx, + ceph_wbc.data_pages, + ceph_wbc.len, + 0, ceph_wbc.from_pool, false); + osd_req_op_extent_update(req, ceph_wbc.op_idx, ceph_wbc.len); - BUG_ON(op_idx + 1 != req->r_num_ops); + BUG_ON(ceph_wbc.op_idx + 1 != req->r_num_ops); - from_pool = false; - if (i < locked_pages) { - BUG_ON(num_ops <= req->r_num_ops); - num_ops -= req->r_num_ops; - locked_pages -= i; + ceph_wbc.from_pool = false; + if (i < ceph_wbc.locked_pages) { + BUG_ON(ceph_wbc.num_ops <= req->r_num_ops); + ceph_wbc.num_ops -= req->r_num_ops; + ceph_wbc.locked_pages -= i; /* allocate new pages array for next request */ - data_pages = pages; - pages = kmalloc_array(locked_pages, sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); + ceph_wbc.data_pages = ceph_wbc.pages; + ceph_wbc.pages = kmalloc_array(ceph_wbc.locked_pages, + sizeof(*ceph_wbc.pages), + GFP_NOFS); + if (!ceph_wbc.pages) { + ceph_wbc.from_pool = true; + ceph_wbc.pages = + mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); + BUG_ON(!ceph_wbc.pages); } - memcpy(pages, data_pages + i, - locked_pages * sizeof(*pages)); - memset(data_pages + i, 0, - locked_pages * sizeof(*pages)); + memcpy(ceph_wbc.pages, ceph_wbc.data_pages + i, + ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages)); + memset(ceph_wbc.data_pages + i, 0, + ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages)); } else { - BUG_ON(num_ops != req->r_num_ops); - index = pages[i - 1]->index + 1; + BUG_ON(ceph_wbc.num_ops != req->r_num_ops); + ceph_wbc.index = ceph_wbc.pages[i - 1]->index + 1; /* request message now owns the pages array */ - pages = NULL; + ceph_wbc.pages = NULL; } req->r_mtime = inode_get_mtime(inode); @@ -1367,7 +1482,7 @@ static int ceph_writepages_start(struct address_space *mapping, req = NULL; wbc->nr_to_write -= i; - if (pages) + if (ceph_wbc.pages) goto new_request; /* @@ -1377,54 +1492,56 @@ static int ceph_writepages_start(struct address_space *mapping, * we tagged for writeback prior to entering this loop. */ if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) - done = true; + ceph_wbc.done = true; release_folios: doutc(cl, "folio_batch release on %d folios (%p)\n", - (int)fbatch.nr, fbatch.nr ? fbatch.folios[0] : NULL); - folio_batch_release(&fbatch); + (int)ceph_wbc.fbatch.nr, + ceph_wbc.fbatch.nr ? ceph_wbc.fbatch.folios[0] : NULL); + folio_batch_release(&ceph_wbc.fbatch); } - if (should_loop && !done) { + if (ceph_wbc.should_loop && !ceph_wbc.done) { /* more to do; loop back to beginning of file */ doutc(cl, "looping back to beginning of file\n"); - end = start_index - 1; /* OK even when start_index == 0 */ + ceph_wbc.end = ceph_wbc.start_index - 1; /* OK even when start_index == 0 */ /* to write dirty pages associated with next snapc, * we need to wait until current writes complete */ if (wbc->sync_mode != WB_SYNC_NONE && - start_index == 0 && /* all dirty pages were checked */ + ceph_wbc.start_index == 0 && /* all dirty pages were checked */ !ceph_wbc.head_snapc) { struct page *page; unsigned i, nr; - index = 0; - while ((index <= end) && - (nr = filemap_get_folios_tag(mapping, &index, + ceph_wbc.index = 0; + while ((ceph_wbc.index <= ceph_wbc.end) && + (nr = filemap_get_folios_tag(mapping, + &ceph_wbc.index, (pgoff_t)-1, PAGECACHE_TAG_WRITEBACK, - &fbatch))) { + &ceph_wbc.fbatch))) { for (i = 0; i < nr; i++) { - page = &fbatch.folios[i]->page; - if (page_snap_context(page) != snapc) + page = &ceph_wbc.fbatch.folios[i]->page; + if (page_snap_context(page) != ceph_wbc.snapc) continue; wait_on_page_writeback(page); } - folio_batch_release(&fbatch); + folio_batch_release(&ceph_wbc.fbatch); cond_resched(); } } - start_index = 0; - index = 0; + ceph_wbc.start_index = 0; + ceph_wbc.index = 0; goto retry; } - if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) - mapping->writeback_index = index; + if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0)) + mapping->writeback_index = ceph_wbc.index; out: ceph_osdc_put_request(req); - ceph_put_snap_context(last_snapc); + ceph_put_snap_context(ceph_wbc.last_snapc); doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), rc); return rc; From ce80b76dd32764cc914975777e058d4fae4f0ea0 Mon Sep 17 00:00:00 2001 From: Viacheslav Dubeyko Date: Tue, 4 Feb 2025 16:02:47 -0800 Subject: [PATCH 2/4] ceph: introduce ceph_process_folio_batch() method First step of ceph_writepages_start() logic is of finding the dirty memory folios and processing it. This patch introduces ceph_process_folio_batch() method that moves this logic into dedicated method. The ceph_writepages_start() has this logic: if (ceph_wbc.locked_pages == 0) lock_page(page); /* first page */ else if (!trylock_page(page)) break; if (folio_test_writeback(folio) || folio_test_private_2(folio) /* [DEPRECATED] */) { if (wbc->sync_mode == WB_SYNC_NONE) { doutc(cl, "%p under writeback\n", folio); folio_unlock(folio); continue; } doutc(cl, "waiting on writeback %p\n", folio); folio_wait_writeback(folio); folio_wait_private_2(folio); /* [DEPRECATED] */ } The problem here that folio/page is locked here at first and it is by set_page_writeback(page) later before submitting the write request. The folio/page is unlocked by writepages_finish() after finishing the write request. It means that logic of checking folio_test_writeback() and folio_wait_writeback() never works because page is locked and it cannot be locked again until write request completion. However, for majority of folios/pages the trylock_page() is used. As a result, multiple threads can try to lock the same folios/pages multiple times even if they are under writeback already. It makes this logic more compute intensive than it is necessary. This patch changes this logic: if (folio_test_writeback(folio) || folio_test_private_2(folio) /* [DEPRECATED] */) { if (wbc->sync_mode == WB_SYNC_NONE) { doutc(cl, "%p under writeback\n", folio); folio_unlock(folio); continue; } doutc(cl, "waiting on writeback %p\n", folio); folio_wait_writeback(folio); folio_wait_private_2(folio); /* [DEPRECATED] */ } if (ceph_wbc.locked_pages == 0) lock_page(page); /* first page */ else if (!trylock_page(page)) break; This logic should exclude the ignoring of writeback state of folios/pages. Signed-off-by: Viacheslav Dubeyko Link: https://lore.kernel.org/r/20250205000249.123054-3-slava@dubeyko.com Tested-by: David Howells Signed-off-by: Christian Brauner --- fs/ceph/addr.c | 568 +++++++++++++++++++++++++++++++------------------ 1 file changed, 365 insertions(+), 203 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index d002ff62d867..739329846a07 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -978,6 +978,27 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_dec_osd_stopping_blocker(fsc->mdsc); } +static inline +bool is_forced_umount(struct address_space *mapping) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + + if (ceph_inode_is_shutdown(inode)) { + if (ci->i_wrbuffer_ref > 0) { + pr_warn_ratelimited_client(cl, + "%llx.%llx %lld forced umount\n", + ceph_vinop(inode), ceph_ino(inode)); + } + mapping_set_error(mapping, -EIO); + return true; + } + + return false; +} + static inline unsigned int ceph_define_write_size(struct address_space *mapping) { @@ -1046,6 +1067,334 @@ void ceph_init_writeback_ctl(struct address_space *mapping, ceph_wbc->data_pages = NULL; } +static inline +int ceph_define_writeback_range(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + + /* find oldest snap context with dirty data */ + ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); + if (!ceph_wbc->snapc) { + /* hmm, why does writepages get called when there + is no dirty data? */ + doutc(cl, " no snap context with dirty data?\n"); + return -ENODATA; + } + + doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", + ceph_wbc->snapc, ceph_wbc->snapc->seq, + ceph_wbc->snapc->num_snaps); + + ceph_wbc->should_loop = false; + + if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { + /* where to start/end? */ + if (wbc->range_cyclic) { + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; + doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index); + } else { + ceph_wbc->index = wbc->range_start >> PAGE_SHIFT; + ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; + if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) + ceph_wbc->range_whole = true; + doutc(cl, " not cyclic, %lu to %lu\n", + ceph_wbc->index, ceph_wbc->end); + } + } else if (!ceph_wbc->head_snapc) { + /* Do not respect wbc->range_{start,end}. Dirty pages + * in that range can be associated with newer snapc. + * They are not writeable until we write all dirty pages + * associated with 'snapc' get written */ + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; + doutc(cl, " non-head snapc, range whole\n"); + } + + ceph_put_snap_context(ceph_wbc->last_snapc); + ceph_wbc->last_snapc = ceph_wbc->snapc; + + return 0; +} + +static inline +bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end; +} + +static inline +bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc, + unsigned index) +{ + return index < ceph_wbc->nr_folios && + ceph_wbc->locked_pages < ceph_wbc->max_pages; +} + +static +int ceph_check_page_before_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct folio *folio) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_snap_context *pgsnapc; + struct page *page = &folio->page; + + /* only dirty pages, or our accounting breaks */ + if (unlikely(!PageDirty(page)) || unlikely(page->mapping != mapping)) { + doutc(cl, "!dirty or !mapping %p\n", page); + return -ENODATA; + } + + /* only if matching snap context */ + pgsnapc = page_snap_context(page); + if (pgsnapc != ceph_wbc->snapc) { + doutc(cl, "page snapc %p %lld != oldest %p %lld\n", + pgsnapc, pgsnapc->seq, + ceph_wbc->snapc, ceph_wbc->snapc->seq); + + if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc && + wbc->sync_mode != WB_SYNC_NONE) + ceph_wbc->should_loop = true; + + return -ENODATA; + } + + if (page_offset(page) >= ceph_wbc->i_size) { + doutc(cl, "folio at %lu beyond eof %llu\n", + folio->index, ceph_wbc->i_size); + + if ((ceph_wbc->size_stable || + folio_pos(folio) >= i_size_read(inode)) && + folio_clear_dirty_for_io(folio)) + folio_invalidate(folio, 0, folio_size(folio)); + + return -ENODATA; + } + + if (ceph_wbc->strip_unit_end && + (page->index > ceph_wbc->strip_unit_end)) { + doutc(cl, "end of strip unit %p\n", page); + return -E2BIG; + } + + return 0; +} + +static inline +void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc, + unsigned int max_pages) +{ + ceph_wbc->pages = kmalloc_array(max_pages, + sizeof(*ceph_wbc->pages), + GFP_NOFS); + if (!ceph_wbc->pages) { + ceph_wbc->from_pool = true; + ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); + BUG_ON(!ceph_wbc->pages); + } +} + +static inline +void ceph_allocate_page_array(struct address_space *mapping, + struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + u64 objnum; + u64 objoff; + u32 xlen; + + /* prepare async write request */ + ceph_wbc->offset = (u64)page_offset(page); + ceph_calc_file_object_mapping(&ci->i_layout, + ceph_wbc->offset, ceph_wbc->wsize, + &objnum, &objoff, &xlen); + + ceph_wbc->num_ops = 1; + ceph_wbc->strip_unit_end = page->index + ((xlen - 1) >> PAGE_SHIFT); + + BUG_ON(ceph_wbc->pages); + ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen); + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages); + + ceph_wbc->len = 0; +} + +static inline +bool is_page_index_contiguous(struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + return page->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT; +} + +static inline +bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->num_ops >= + (ceph_wbc->from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS); +} + +static inline +bool is_write_congestion_happened(struct ceph_fs_client *fsc) +{ + return atomic_long_inc_return(&fsc->writeback_count) > + CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb); +} + +static inline +int ceph_move_dirty_page_in_page_array(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct page **pages = ceph_wbc->pages; + unsigned int index = ceph_wbc->locked_pages; + gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS; + + if (IS_ENCRYPTED(inode)) { + pages[index] = fscrypt_encrypt_pagecache_blocks(page, + PAGE_SIZE, + 0, + gfp_flags); + if (IS_ERR(pages[index])) { + if (PTR_ERR(pages[index]) == -EINVAL) { + pr_err_client(cl, "inode->i_blkbits=%hhu\n", + inode->i_blkbits); + } + + /* better not fail on first page! */ + BUG_ON(ceph_wbc->locked_pages == 0); + + pages[index] = NULL; + return PTR_ERR(pages[index]); + } + } else { + pages[index] = page; + } + + ceph_wbc->locked_pages++; + + return 0; +} + +static +int ceph_process_folio_batch(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct folio *folio = NULL; + struct page *page = NULL; + unsigned i; + int rc = 0; + + for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) { + folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + page = &folio->page; + + doutc(cl, "? %p idx %lu, folio_test_writeback %#x, " + "folio_test_dirty %#x, folio_test_locked %#x\n", + page, page->index, folio_test_writeback(folio), + folio_test_dirty(folio), + folio_test_locked(folio)); + + if (folio_test_writeback(folio) || + folio_test_private_2(folio) /* [DEPRECATED] */) { + doutc(cl, "waiting on writeback %p\n", folio); + folio_wait_writeback(folio); + folio_wait_private_2(folio); /* [DEPRECATED] */ + continue; + } + + if (ceph_wbc->locked_pages == 0) + lock_page(page); /* first page */ + else if (!trylock_page(page)) + break; + + rc = ceph_check_page_before_write(mapping, wbc, + ceph_wbc, folio); + if (rc == -ENODATA) { + rc = 0; + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } else if (rc == -E2BIG) { + rc = 0; + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + break; + } + + if (!clear_page_dirty_for_io(page)) { + doutc(cl, "%p !clear_page_dirty_for_io\n", page); + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } + + /* + * We have something to write. If this is + * the first locked page this time through, + * calculate max possible write size and + * allocate a page array + */ + if (ceph_wbc->locked_pages == 0) { + ceph_allocate_page_array(mapping, ceph_wbc, page); + } else if (!is_page_index_contiguous(ceph_wbc, page)) { + if (is_num_ops_too_big(ceph_wbc)) { + redirty_page_for_writepage(wbc, page); + unlock_page(page); + break; + } + + ceph_wbc->num_ops++; + ceph_wbc->offset = (u64)page_offset(page); + ceph_wbc->len = 0; + } + + /* note position of first page in fbatch */ + doutc(cl, "%llx.%llx will write page %p idx %lu\n", + ceph_vinop(inode), page, page->index); + + fsc->write_congested = is_write_congestion_happened(fsc); + + rc = ceph_move_dirty_page_in_page_array(mapping, wbc, + ceph_wbc, page); + if (rc) { + redirty_page_for_writepage(wbc, page); + unlock_page(page); + break; + } + + ceph_wbc->fbatch.folios[i] = NULL; + ceph_wbc->len += thp_size(page); + } + + ceph_wbc->processed_in_fbatch = i; + + return rc; +} + /* * initiate async writeback */ @@ -1057,7 +1406,6 @@ static int ceph_writepages_start(struct address_space *mapping, struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; struct ceph_vino vino = ceph_vino(inode); - struct ceph_snap_context *pgsnapc; struct ceph_writeback_ctl ceph_wbc; struct ceph_osd_request *req = NULL; int rc = 0; @@ -1071,235 +1419,49 @@ static int ceph_writepages_start(struct address_space *mapping, wbc->sync_mode == WB_SYNC_NONE ? "NONE" : (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); - if (ceph_inode_is_shutdown(inode)) { - if (ci->i_wrbuffer_ref > 0) { - pr_warn_ratelimited_client(cl, - "%llx.%llx %lld forced umount\n", - ceph_vinop(inode), ceph_ino(inode)); - } - mapping_set_error(mapping, -EIO); - return -EIO; /* we're in a forced umount, don't write! */ + if (is_forced_umount(mapping)) { + /* we're in a forced umount, don't write! */ + return -EIO; } ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); retry: - /* find oldest snap context with dirty data */ - ceph_wbc.snapc = get_oldest_context(inode, &ceph_wbc, NULL); - if (!ceph_wbc.snapc) { + rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); + if (rc == -ENODATA) { /* hmm, why does writepages get called when there is no dirty data? */ - doutc(cl, " no snap context with dirty data?\n"); + rc = 0; goto out; } - doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", - ceph_wbc.snapc, ceph_wbc.snapc->seq, - ceph_wbc.snapc->num_snaps); - - ceph_wbc.should_loop = false; - if (ceph_wbc.head_snapc && ceph_wbc.snapc != ceph_wbc.last_snapc) { - /* where to start/end? */ - if (wbc->range_cyclic) { - ceph_wbc.index = ceph_wbc.start_index; - ceph_wbc.end = -1; - if (ceph_wbc.index > 0) - ceph_wbc.should_loop = true; - doutc(cl, " cyclic, start at %lu\n", ceph_wbc.index); - } else { - ceph_wbc.index = wbc->range_start >> PAGE_SHIFT; - ceph_wbc.end = wbc->range_end >> PAGE_SHIFT; - if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) - ceph_wbc.range_whole = true; - doutc(cl, " not cyclic, %lu to %lu\n", - ceph_wbc.index, ceph_wbc.end); - } - } else if (!ceph_wbc.head_snapc) { - /* Do not respect wbc->range_{start,end}. Dirty pages - * in that range can be associated with newer snapc. - * They are not writeable until we write all dirty pages - * associated with 'snapc' get written */ - if (ceph_wbc.index > 0) - ceph_wbc.should_loop = true; - doutc(cl, " non-head snapc, range whole\n"); - } if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); - ceph_put_snap_context(ceph_wbc.last_snapc); - ceph_wbc.last_snapc = ceph_wbc.snapc; - - while (!ceph_wbc.done && ceph_wbc.index <= ceph_wbc.end) { + while (!has_writeback_done(&ceph_wbc)) { unsigned i; struct page *page; + ceph_wbc.locked_pages = 0; ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; get_more_pages: + ceph_folio_batch_reinit(&ceph_wbc); + ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, &ceph_wbc.index, ceph_wbc.end, ceph_wbc.tag, &ceph_wbc.fbatch); - doutc(cl, "pagevec_lookup_range_tag got %d\n", - ceph_wbc.nr_folios); + doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n", + ceph_wbc.tag, ceph_wbc.nr_folios); + if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) break; - for (i = 0; i < ceph_wbc.nr_folios && - ceph_wbc.locked_pages < ceph_wbc.max_pages; i++) { - struct folio *folio = ceph_wbc.fbatch.folios[i]; - page = &folio->page; - doutc(cl, "? %p idx %lu\n", page, page->index); - if (ceph_wbc.locked_pages == 0) - lock_page(page); /* first page */ - else if (!trylock_page(page)) - break; - - /* only dirty pages, or our accounting breaks */ - if (unlikely(!PageDirty(page)) || - unlikely(page->mapping != mapping)) { - doutc(cl, "!dirty or !mapping %p\n", page); - unlock_page(page); - continue; - } - /* only if matching snap context */ - pgsnapc = page_snap_context(page); - if (pgsnapc != ceph_wbc.snapc) { - doutc(cl, "page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, - ceph_wbc.snapc, ceph_wbc.snapc->seq); - if (!ceph_wbc.should_loop && - !ceph_wbc.head_snapc && - wbc->sync_mode != WB_SYNC_NONE) - ceph_wbc.should_loop = true; - unlock_page(page); - continue; - } - if (page_offset(page) >= ceph_wbc.i_size) { - doutc(cl, "folio at %lu beyond eof %llu\n", - folio->index, ceph_wbc.i_size); - if ((ceph_wbc.size_stable || - folio_pos(folio) >= i_size_read(inode)) && - folio_clear_dirty_for_io(folio)) - folio_invalidate(folio, 0, - folio_size(folio)); - folio_unlock(folio); - continue; - } - if (ceph_wbc.strip_unit_end && - (page->index > ceph_wbc.strip_unit_end)) { - doutc(cl, "end of strip unit %p\n", page); - unlock_page(page); - break; - } - if (folio_test_writeback(folio) || - folio_test_private_2(folio) /* [DEPRECATED] */) { - if (wbc->sync_mode == WB_SYNC_NONE) { - doutc(cl, "%p under writeback\n", folio); - folio_unlock(folio); - continue; - } - doutc(cl, "waiting on writeback %p\n", folio); - folio_wait_writeback(folio); - folio_wait_private_2(folio); /* [DEPRECATED] */ - } - - if (!clear_page_dirty_for_io(page)) { - doutc(cl, "%p !clear_page_dirty_for_io\n", page); - unlock_page(page); - continue; - } - - /* - * We have something to write. If this is - * the first locked page this time through, - * calculate max possinle write size and - * allocate a page array - */ - if (ceph_wbc.locked_pages == 0) { - u64 objnum; - u64 objoff; - u32 xlen; - - /* prepare async write request */ - ceph_wbc.offset = (u64)page_offset(page); - ceph_calc_file_object_mapping(&ci->i_layout, - ceph_wbc.offset, - ceph_wbc.wsize, - &objnum, &objoff, - &xlen); - ceph_wbc.len = xlen; - - ceph_wbc.num_ops = 1; - ceph_wbc.strip_unit_end = page->index + - ((ceph_wbc.len - 1) >> PAGE_SHIFT); - - BUG_ON(ceph_wbc.pages); - ceph_wbc.max_pages = - calc_pages_for(0, (u64)ceph_wbc.len); - ceph_wbc.pages = kmalloc_array(ceph_wbc.max_pages, - sizeof(*ceph_wbc.pages), - GFP_NOFS); - if (!ceph_wbc.pages) { - ceph_wbc.from_pool = true; - ceph_wbc.pages = - mempool_alloc(ceph_wb_pagevec_pool, - GFP_NOFS); - BUG_ON(!ceph_wbc.pages); - } - - ceph_wbc.len = 0; - } else if (page->index != - (ceph_wbc.offset + ceph_wbc.len) >> PAGE_SHIFT) { - if (ceph_wbc.num_ops >= - (ceph_wbc.from_pool ? CEPH_OSD_SLAB_OPS : - CEPH_OSD_MAX_OPS)) { - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - - ceph_wbc.num_ops++; - ceph_wbc.offset = (u64)page_offset(page); - ceph_wbc.len = 0; - } - - /* note position of first page in fbatch */ - doutc(cl, "%llx.%llx will write page %p idx %lu\n", - ceph_vinop(inode), page, page->index); - - if (atomic_long_inc_return(&fsc->writeback_count) > - CONGESTION_ON_THRESH( - fsc->mount_options->congestion_kb)) - fsc->write_congested = true; - - if (IS_ENCRYPTED(inode)) { - ceph_wbc.pages[ceph_wbc.locked_pages] = - fscrypt_encrypt_pagecache_blocks(page, - PAGE_SIZE, 0, - ceph_wbc.locked_pages ? - GFP_NOWAIT : GFP_NOFS); - if (IS_ERR(ceph_wbc.pages[ceph_wbc.locked_pages])) { - if (PTR_ERR(ceph_wbc.pages[ceph_wbc.locked_pages]) == -EINVAL) - pr_err_client(cl, - "inode->i_blkbits=%hhu\n", - inode->i_blkbits); - /* better not fail on first page! */ - BUG_ON(ceph_wbc.locked_pages == 0); - ceph_wbc.pages[ceph_wbc.locked_pages] = NULL; - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - ++ceph_wbc.locked_pages; - } else { - ceph_wbc.pages[ceph_wbc.locked_pages++] = page; - } - - ceph_wbc.fbatch.folios[i] = NULL; - ceph_wbc.len += thp_size(page); - } + rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc); + if (rc) + goto release_folios; /* did we get anything? */ if (!ceph_wbc.locked_pages) From 1551ec61dc551dab1bb40c516a5a096607aff774 Mon Sep 17 00:00:00 2001 From: Viacheslav Dubeyko Date: Tue, 4 Feb 2025 16:02:48 -0800 Subject: [PATCH 3/4] ceph: introduce ceph_submit_write() method Final responsibility of ceph_writepages_start() is to submit write requests for processed dirty folios/pages. The ceph_submit_write() summarize all this logic in one method. The generic/421 fails to finish because of the issue: Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.894678] INFO: task kworker/u48:0:11 blocked for more than 122 seconds. Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895403] Not tainted 6.13.0-rc5+ #1 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895867] "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message. Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896633] task:kworker/u48:0 state:D stack:0 pid:11 tgid:11 ppid:2 flags:0x00004000 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896641] Workqueue: writeback wb_workfn (flush-ceph-24) Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897614] Call Trace: Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897620] Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897629] __schedule+0x443/0x16b0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897637] schedule+0x2b/0x140 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897640] io_schedule+0x4c/0x80 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897643] folio_wait_bit_common+0x11b/0x310 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897646] ? _raw_spin_unlock_irq+0xe/0x50 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897652] ? __pfx_wake_page_function+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897655] __folio_lock+0x17/0x30 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897658] ceph_writepages_start+0xca9/0x1fb0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897663] ? fsnotify_remove_queued_event+0x2f/0x40 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897668] do_writepages+0xd2/0x240 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897672] __writeback_single_inode+0x44/0x350 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897675] writeback_sb_inodes+0x25c/0x550 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897680] wb_writeback+0x89/0x310 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897683] ? finish_task_switch.isra.0+0x97/0x310 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897687] wb_workfn+0xb5/0x410 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897689] process_one_work+0x188/0x3d0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897692] worker_thread+0x2b5/0x3c0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897694] ? __pfx_worker_thread+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897696] kthread+0xe1/0x120 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897699] ? __pfx_kthread+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897701] ret_from_fork+0x43/0x70 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897705] ? __pfx_kthread+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897707] ret_from_fork_asm+0x1a/0x30 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897711] There are two problems here: if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { rc = -EIO; goto release_folios; } (1) ceph_kill_sb() doesn't wait ending of flushing all dirty folios/pages because of racy nature of mdsc->stopping_blockers. As a result, mdsc->stopping becomes CEPH_MDSC_STOPPING_FLUSHED too early. (2) The ceph_inc_osd_stopping_blocker(fsc->mdsc) fails to increment mdsc->stopping_blockers. Finally, already locked folios/pages are never been unlocked and the logic tries to lock the same page second time. This patch implements refactoring of ceph_submit_write() and also it solves the second issue. Signed-off-by: Viacheslav Dubeyko Link: https://lore.kernel.org/r/20250205000249.123054-4-slava@dubeyko.com Tested-by: David Howells Signed-off-by: Christian Brauner --- fs/ceph/addr.c | 465 +++++++++++++++++++++++++++---------------------- 1 file changed, 259 insertions(+), 206 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 739329846a07..02d20c000dc5 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1395,6 +1395,245 @@ int ceph_process_folio_batch(struct address_space *mapping, return rc; } +static inline +void ceph_shift_unused_folios_left(struct folio_batch *fbatch) +{ + unsigned j, n = 0; + + /* shift unused page to beginning of fbatch */ + for (j = 0; j < folio_batch_count(fbatch); j++) { + if (!fbatch->folios[j]) + continue; + + if (n < j) { + fbatch->folios[n] = fbatch->folios[j]; + } + + n++; + } + + fbatch->nr = n; +} + +static +int ceph_submit_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_vino vino = ceph_vino(inode); + struct ceph_osd_request *req = NULL; + struct page *page = NULL; + bool caching = ceph_is_cache_enabled(inode); + u64 offset; + u64 len; + unsigned i; + +new_request: + offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]); + len = ceph_wbc->wsize; + + req = ceph_osdc_new_request(&fsc->client->osdc, + &ci->i_layout, vino, + offset, &len, 0, ceph_wbc->num_ops, + CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, + ceph_wbc->snapc, ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, false); + if (IS_ERR(req)) { + req = ceph_osdc_new_request(&fsc->client->osdc, + &ci->i_layout, vino, + offset, &len, 0, + min(ceph_wbc->num_ops, + CEPH_OSD_SLAB_OPS), + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE, + ceph_wbc->snapc, + ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, + true); + BUG_ON(IS_ERR(req)); + } + + page = ceph_wbc->pages[ceph_wbc->locked_pages - 1]; + BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset); + + if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { + for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) { + struct folio *folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + page = &folio->page; + redirty_page_for_writepage(wbc, page); + unlock_page(page); + } + + for (i = 0; i < ceph_wbc->locked_pages; i++) { + page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); + + if (!page) + continue; + + redirty_page_for_writepage(wbc, page); + unlock_page(page); + } + + ceph_osdc_put_request(req); + return -EIO; + } + + req->r_callback = writepages_finish; + req->r_inode = inode; + + /* Format the osd request message and submit the write */ + len = 0; + ceph_wbc->data_pages = ceph_wbc->pages; + ceph_wbc->op_idx = 0; + for (i = 0; i < ceph_wbc->locked_pages; i++) { + u64 cur_offset; + + page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); + cur_offset = page_offset(page); + + /* + * Discontinuity in page range? Ceph can handle that by just passing + * multiple extents in the write op. + */ + if (offset + len != cur_offset) { + /* If it's full, stop here */ + if (ceph_wbc->op_idx + 1 == req->r_num_ops) + break; + + /* Kick off an fscache write with what we have so far. */ + ceph_fscache_write_to_cache(inode, offset, len, caching); + + /* Start a new extent */ + osd_req_op_extent_dup_last(req, ceph_wbc->op_idx, + cur_offset - offset); + + doutc(cl, "got pages at %llu~%llu\n", offset, len); + + osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, + ceph_wbc->data_pages, + len, 0, + ceph_wbc->from_pool, + false); + osd_req_op_extent_update(req, ceph_wbc->op_idx, len); + + len = 0; + offset = cur_offset; + ceph_wbc->data_pages = ceph_wbc->pages + i; + ceph_wbc->op_idx++; + } + + set_page_writeback(page); + + if (caching) + ceph_set_page_fscache(page); + + len += thp_size(page); + } + + ceph_fscache_write_to_cache(inode, offset, len, caching); + + if (ceph_wbc->size_stable) { + len = min(len, ceph_wbc->i_size - offset); + } else if (i == ceph_wbc->locked_pages) { + /* writepages_finish() clears writeback pages + * according to the data length, so make sure + * data length covers all locked pages */ + u64 min_len = len + 1 - thp_size(page); + len = get_writepages_data_length(inode, + ceph_wbc->pages[i - 1], + offset); + len = max(len, min_len); + } + + if (IS_ENCRYPTED(inode)) + len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); + + doutc(cl, "got pages at %llu~%llu\n", offset, len); + + if (IS_ENCRYPTED(inode) && + ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) { + pr_warn_client(cl, + "bad encrypted write offset=%lld len=%llu\n", + offset, len); + } + + osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, + ceph_wbc->data_pages, len, + 0, ceph_wbc->from_pool, false); + osd_req_op_extent_update(req, ceph_wbc->op_idx, len); + + BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops); + + ceph_wbc->from_pool = false; + if (i < ceph_wbc->locked_pages) { + BUG_ON(ceph_wbc->num_ops <= req->r_num_ops); + ceph_wbc->num_ops -= req->r_num_ops; + ceph_wbc->locked_pages -= i; + + /* allocate new pages array for next request */ + ceph_wbc->data_pages = ceph_wbc->pages; + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages); + memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i, + ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); + memset(ceph_wbc->data_pages + i, 0, + ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); + } else { + BUG_ON(ceph_wbc->num_ops != req->r_num_ops); + /* request message now owns the pages array */ + ceph_wbc->pages = NULL; + } + + req->r_mtime = inode_get_mtime(inode); + ceph_osdc_start_request(&fsc->client->osdc, req); + req = NULL; + + wbc->nr_to_write -= i; + if (ceph_wbc->pages) + goto new_request; + + return 0; +} + +static +void ceph_wait_until_current_writes_complete(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct page *page; + unsigned i, nr; + + if (wbc->sync_mode != WB_SYNC_NONE && + ceph_wbc->start_index == 0 && /* all dirty pages were checked */ + !ceph_wbc->head_snapc) { + ceph_wbc->index = 0; + + while ((ceph_wbc->index <= ceph_wbc->end) && + (nr = filemap_get_folios_tag(mapping, + &ceph_wbc->index, + (pgoff_t)-1, + PAGECACHE_TAG_WRITEBACK, + &ceph_wbc->fbatch))) { + for (i = 0; i < nr; i++) { + page = &ceph_wbc->fbatch.folios[i]->page; + if (page_snap_context(page) != ceph_wbc->snapc) + continue; + wait_on_page_writeback(page); + } + + folio_batch_release(&ceph_wbc->fbatch); + cond_resched(); + } + } +} + /* * initiate async writeback */ @@ -1402,17 +1641,12 @@ static int ceph_writepages_start(struct address_space *mapping, struct writeback_control *wbc) { struct inode *inode = mapping->host; - struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; - struct ceph_vino vino = ceph_vino(inode); struct ceph_writeback_ctl ceph_wbc; - struct ceph_osd_request *req = NULL; int rc = 0; - bool caching = ceph_is_cache_enabled(inode); - if (wbc->sync_mode == WB_SYNC_NONE && - fsc->write_congested) + if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested) return 0; doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), @@ -1439,9 +1673,6 @@ static int ceph_writepages_start(struct address_space *mapping, tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); while (!has_writeback_done(&ceph_wbc)) { - unsigned i; - struct page *page; - ceph_wbc.locked_pages = 0; ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; @@ -1459,6 +1690,7 @@ static int ceph_writepages_start(struct address_space *mapping, if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) break; +process_folio_batch: rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc); if (rc) goto release_folios; @@ -1466,186 +1698,29 @@ static int ceph_writepages_start(struct address_space *mapping, /* did we get anything? */ if (!ceph_wbc.locked_pages) goto release_folios; - if (i) { - unsigned j, n = 0; - /* shift unused page to beginning of fbatch */ - for (j = 0; j < ceph_wbc.nr_folios; j++) { - if (!ceph_wbc.fbatch.folios[j]) - continue; - if (n < j) { - ceph_wbc.fbatch.folios[n] = - ceph_wbc.fbatch.folios[j]; - } - n++; - } - ceph_wbc.fbatch.nr = n; - if (ceph_wbc.nr_folios && i == ceph_wbc.nr_folios && + if (ceph_wbc.processed_in_fbatch) { + ceph_shift_unused_folios_left(&ceph_wbc.fbatch); + + if (folio_batch_count(&ceph_wbc.fbatch) == 0 && ceph_wbc.locked_pages < ceph_wbc.max_pages) { doutc(cl, "reached end fbatch, trying for more\n"); - folio_batch_release(&ceph_wbc.fbatch); goto get_more_pages; } } -new_request: - ceph_wbc.offset = ceph_fscrypt_page_offset(ceph_wbc.pages[0]); - ceph_wbc.len = ceph_wbc.wsize; - - req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - ceph_wbc.offset, &ceph_wbc.len, - 0, ceph_wbc.num_ops, - CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, - ceph_wbc.snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, false); - if (IS_ERR(req)) { - req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - ceph_wbc.offset, &ceph_wbc.len, - 0, min(ceph_wbc.num_ops, - CEPH_OSD_SLAB_OPS), - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE, - ceph_wbc.snapc, - ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, true); - BUG_ON(IS_ERR(req)); - } - BUG_ON(ceph_wbc.len < - ceph_fscrypt_page_offset(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) + - thp_size(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) - - ceph_wbc.offset); - - if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { - rc = -EIO; + rc = ceph_submit_write(mapping, wbc, &ceph_wbc); + if (rc) goto release_folios; + + ceph_wbc.locked_pages = 0; + ceph_wbc.strip_unit_end = 0; + + if (folio_batch_count(&ceph_wbc.fbatch) > 0) { + ceph_wbc.nr_folios = + folio_batch_count(&ceph_wbc.fbatch); + goto process_folio_batch; } - req->r_callback = writepages_finish; - req->r_inode = inode; - - /* Format the osd request message and submit the write */ - ceph_wbc.len = 0; - ceph_wbc.data_pages = ceph_wbc.pages; - ceph_wbc.op_idx = 0; - for (i = 0; i < ceph_wbc.locked_pages; i++) { - struct page *page = - ceph_fscrypt_pagecache_page(ceph_wbc.pages[i]); - - u64 cur_offset = page_offset(page); - /* - * Discontinuity in page range? Ceph can handle that by just passing - * multiple extents in the write op. - */ - if (ceph_wbc.offset + ceph_wbc.len != cur_offset) { - /* If it's full, stop here */ - if (ceph_wbc.op_idx + 1 == req->r_num_ops) - break; - - /* Kick off an fscache write with what we have so far. */ - ceph_fscache_write_to_cache(inode, ceph_wbc.offset, - ceph_wbc.len, caching); - - /* Start a new extent */ - osd_req_op_extent_dup_last(req, ceph_wbc.op_idx, - cur_offset - - ceph_wbc.offset); - doutc(cl, "got pages at %llu~%llu\n", - ceph_wbc.offset, - ceph_wbc.len); - osd_req_op_extent_osd_data_pages(req, - ceph_wbc.op_idx, - ceph_wbc.data_pages, - ceph_wbc.len, 0, - ceph_wbc.from_pool, false); - osd_req_op_extent_update(req, ceph_wbc.op_idx, - ceph_wbc.len); - - ceph_wbc.len = 0; - ceph_wbc.offset = cur_offset; - ceph_wbc.data_pages = ceph_wbc.pages + i; - ceph_wbc.op_idx++; - } - - set_page_writeback(page); - if (caching) - ceph_set_page_fscache(page); - ceph_wbc.len += thp_size(page); - } - ceph_fscache_write_to_cache(inode, ceph_wbc.offset, - ceph_wbc.len, caching); - - if (ceph_wbc.size_stable) { - ceph_wbc.len = min(ceph_wbc.len, - ceph_wbc.i_size - ceph_wbc.offset); - } else if (i == ceph_wbc.locked_pages) { - /* writepages_finish() clears writeback pages - * according to the data length, so make sure - * data length covers all locked pages */ - u64 min_len = ceph_wbc.len + 1 - thp_size(page); - ceph_wbc.len = - get_writepages_data_length(inode, - ceph_wbc.pages[i - 1], - ceph_wbc.offset); - ceph_wbc.len = max(ceph_wbc.len, min_len); - } - if (IS_ENCRYPTED(inode)) { - ceph_wbc.len = round_up(ceph_wbc.len, - CEPH_FSCRYPT_BLOCK_SIZE); - } - - doutc(cl, "got pages at %llu~%llu\n", - ceph_wbc.offset, ceph_wbc.len); - - if (IS_ENCRYPTED(inode) && - ((ceph_wbc.offset | ceph_wbc.len) & ~CEPH_FSCRYPT_BLOCK_MASK)) - pr_warn_client(cl, - "bad encrypted write offset=%lld len=%llu\n", - ceph_wbc.offset, ceph_wbc.len); - - osd_req_op_extent_osd_data_pages(req, ceph_wbc.op_idx, - ceph_wbc.data_pages, - ceph_wbc.len, - 0, ceph_wbc.from_pool, false); - osd_req_op_extent_update(req, ceph_wbc.op_idx, ceph_wbc.len); - - BUG_ON(ceph_wbc.op_idx + 1 != req->r_num_ops); - - ceph_wbc.from_pool = false; - if (i < ceph_wbc.locked_pages) { - BUG_ON(ceph_wbc.num_ops <= req->r_num_ops); - ceph_wbc.num_ops -= req->r_num_ops; - ceph_wbc.locked_pages -= i; - - /* allocate new pages array for next request */ - ceph_wbc.data_pages = ceph_wbc.pages; - ceph_wbc.pages = kmalloc_array(ceph_wbc.locked_pages, - sizeof(*ceph_wbc.pages), - GFP_NOFS); - if (!ceph_wbc.pages) { - ceph_wbc.from_pool = true; - ceph_wbc.pages = - mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!ceph_wbc.pages); - } - memcpy(ceph_wbc.pages, ceph_wbc.data_pages + i, - ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages)); - memset(ceph_wbc.data_pages + i, 0, - ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages)); - } else { - BUG_ON(ceph_wbc.num_ops != req->r_num_ops); - ceph_wbc.index = ceph_wbc.pages[i - 1]->index + 1; - /* request message now owns the pages array */ - ceph_wbc.pages = NULL; - } - - req->r_mtime = inode_get_mtime(inode); - ceph_osdc_start_request(&fsc->client->osdc, req); - req = NULL; - - wbc->nr_to_write -= i; - if (ceph_wbc.pages) - goto new_request; /* * We stop writing back only if we are not doing @@ -1666,32 +1741,12 @@ static int ceph_writepages_start(struct address_space *mapping, if (ceph_wbc.should_loop && !ceph_wbc.done) { /* more to do; loop back to beginning of file */ doutc(cl, "looping back to beginning of file\n"); - ceph_wbc.end = ceph_wbc.start_index - 1; /* OK even when start_index == 0 */ + /* OK even when start_index == 0 */ + ceph_wbc.end = ceph_wbc.start_index - 1; /* to write dirty pages associated with next snapc, * we need to wait until current writes complete */ - if (wbc->sync_mode != WB_SYNC_NONE && - ceph_wbc.start_index == 0 && /* all dirty pages were checked */ - !ceph_wbc.head_snapc) { - struct page *page; - unsigned i, nr; - ceph_wbc.index = 0; - while ((ceph_wbc.index <= ceph_wbc.end) && - (nr = filemap_get_folios_tag(mapping, - &ceph_wbc.index, - (pgoff_t)-1, - PAGECACHE_TAG_WRITEBACK, - &ceph_wbc.fbatch))) { - for (i = 0; i < nr; i++) { - page = &ceph_wbc.fbatch.folios[i]->page; - if (page_snap_context(page) != ceph_wbc.snapc) - continue; - wait_on_page_writeback(page); - } - folio_batch_release(&ceph_wbc.fbatch); - cond_resched(); - } - } + ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc); ceph_wbc.start_index = 0; ceph_wbc.index = 0; @@ -1702,15 +1757,13 @@ static int ceph_writepages_start(struct address_space *mapping, mapping->writeback_index = ceph_wbc.index; out: - ceph_osdc_put_request(req); ceph_put_snap_context(ceph_wbc.last_snapc); doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), rc); + return rc; } - - /* * See if a given @snapc is either writeable, or already written. */ From fd7449d937e7fb3144770592927cf452bf66dbd3 Mon Sep 17 00:00:00 2001 From: Viacheslav Dubeyko Date: Tue, 4 Feb 2025 16:02:49 -0800 Subject: [PATCH 4/4] ceph: fix generic/421 test failure The generic/421 fails to finish because of the issue: Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.894678] INFO: task kworker/u48:0:11 blocked for more than 122 seconds. Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895403] Not tainted 6.13.0-rc5+ #1 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895867] "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message. Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896633] task:kworker/u48:0 state:D stack:0 pid:11 tgid:11 ppid:2 flags:0x00004000 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896641] Workqueue: writeback wb_workfn (flush-ceph-24) Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897614] Call Trace: Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897620] Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897629] __schedule+0x443/0x16b0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897637] schedule+0x2b/0x140 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897640] io_schedule+0x4c/0x80 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897643] folio_wait_bit_common+0x11b/0x310 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897646] ? _raw_spin_unlock_irq+0xe/0x50 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897652] ? __pfx_wake_page_function+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897655] __folio_lock+0x17/0x30 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897658] ceph_writepages_start+0xca9/0x1fb0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897663] ? fsnotify_remove_queued_event+0x2f/0x40 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897668] do_writepages+0xd2/0x240 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897672] __writeback_single_inode+0x44/0x350 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897675] writeback_sb_inodes+0x25c/0x550 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897680] wb_writeback+0x89/0x310 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897683] ? finish_task_switch.isra.0+0x97/0x310 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897687] wb_workfn+0xb5/0x410 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897689] process_one_work+0x188/0x3d0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897692] worker_thread+0x2b5/0x3c0 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897694] ? __pfx_worker_thread+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897696] kthread+0xe1/0x120 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897699] ? __pfx_kthread+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897701] ret_from_fork+0x43/0x70 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897705] ? __pfx_kthread+0x10/0x10 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897707] ret_from_fork_asm+0x1a/0x30 Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897711] There are several issues here: (1) ceph_kill_sb() doesn't wait ending of flushing all dirty folios/pages because of racy nature of mdsc->stopping_blockers. As a result, mdsc->stopping becomes CEPH_MDSC_STOPPING_FLUSHED too early. (2) The ceph_inc_osd_stopping_blocker(fsc->mdsc) fails to increment mdsc->stopping_blockers. Finally, already locked folios/pages are never been unlocked and the logic tries to lock the same page second time. (3) The folio_batch with found dirty pages by filemap_get_folios_tag() is not processed properly. And this is why some number of dirty pages simply never processed and we have dirty folios/pages after unmount anyway. This patch fixes the issues by means of: (1) introducing dirty_folios counter and flush_end_wq waiting queue in struct ceph_mds_client; (2) ceph_dirty_folio() increments the dirty_folios counter; (3) writepages_finish() decrements the dirty_folios counter and wake up all waiters on the queue if dirty_folios counter is equal or lesser than zero; (4) adding in ceph_kill_sb() method the logic of checking the value of dirty_folios counter and waiting if it is bigger than zero; (5) adding ceph_inc_osd_stopping_blocker() call in the beginning of the ceph_writepages_start() and ceph_dec_osd_stopping_blocker() at the end of the ceph_writepages_start() with the goal to resolve the racy nature of mdsc->stopping_blockers. sudo ./check generic/421 FSTYP -- ceph PLATFORM -- Linux/x86_64 ceph-testing-0001 6.13.0+ #137 SMP PREEMPT_DYNAMIC Mon Feb 3 20:30:08 UTC 2025 MKFS_OPTIONS -- 127.0.0.1:40551:/scratch MOUNT_OPTIONS -- -o name=fs,secret=,ms_mode=crc,nowsync,copyfrom 127.0.0.1:40551:/scratch /mnt/scratch generic/421 7s ... 4s Ran: generic/421 Passed all 1 tests Signed-off-by: Viacheslav Dubeyko Link: https://lore.kernel.org/r/20250205000249.123054-5-slava@dubeyko.com Tested-by: David Howells Signed-off-by: Christian Brauner --- fs/ceph/addr.c | 20 +++++++++++++++++++- fs/ceph/mds_client.c | 2 ++ fs/ceph/mds_client.h | 3 +++ fs/ceph/super.c | 11 +++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 02d20c000dc5..d82ce4867fca 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -82,6 +82,7 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) { struct inode *inode = mapping->host; struct ceph_client *cl = ceph_inode_to_client(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_inode_info *ci; struct ceph_snap_context *snapc; @@ -92,6 +93,8 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) return false; } + atomic64_inc(&mdsc->dirty_folios); + ci = ceph_inode(inode); /* dirty the head */ @@ -894,6 +897,7 @@ static void writepages_finish(struct ceph_osd_request *req) struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); unsigned int len = 0; bool remove_page; @@ -949,6 +953,12 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_put_snap_context(detach_page_private(page)); end_page_writeback(page); + + if (atomic64_dec_return(&mdsc->dirty_folios) <= 0) { + wake_up_all(&mdsc->flush_end_wq); + WARN_ON(atomic64_read(&mdsc->dirty_folios) < 0); + } + doutc(cl, "unlocking %p\n", page); if (remove_page) @@ -1660,13 +1670,18 @@ static int ceph_writepages_start(struct address_space *mapping, ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); + if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { + rc = -EIO; + goto out; + } + retry: rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); if (rc == -ENODATA) { /* hmm, why does writepages get called when there is no dirty data? */ rc = 0; - goto out; + goto dec_osd_stopping_blocker; } if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) @@ -1756,6 +1771,9 @@ static int ceph_writepages_start(struct address_space *mapping, if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0)) mapping->writeback_index = ceph_wbc.index; +dec_osd_stopping_blocker: + ceph_dec_osd_stopping_blocker(fsc->mdsc); + out: ceph_put_snap_context(ceph_wbc.last_snapc); doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 54b3421501e9..230e0c3f341f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -5489,6 +5489,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->stopping_lock); atomic_set(&mdsc->stopping_blockers, 0); init_completion(&mdsc->stopping_waiter); + atomic64_set(&mdsc->dirty_folios, 0); + init_waitqueue_head(&mdsc->flush_end_wq); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->quotarealms_inodes = RB_ROOT; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 7c9fee9e80d4..3e2a6fa7c19a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -458,6 +458,9 @@ struct ceph_mds_client { atomic_t stopping_blockers; struct completion stopping_waiter; + atomic64_t dirty_folios; + wait_queue_head_t flush_end_wq; + atomic64_t quotarealms_count; /* # realms with quota */ /* * We keep a list of inodes we don't see in the mountpoint but that we diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 4344e1f11806..f3951253e393 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1563,6 +1563,17 @@ static void ceph_kill_sb(struct super_block *s) */ sync_filesystem(s); + if (atomic64_read(&mdsc->dirty_folios) > 0) { + wait_queue_head_t *wq = &mdsc->flush_end_wq; + long timeleft = wait_event_killable_timeout(*wq, + atomic64_read(&mdsc->dirty_folios) <= 0, + fsc->client->options->mount_timeout); + if (!timeleft) /* timed out */ + pr_warn_client(cl, "umount timed out, %ld\n", timeleft); + else if (timeleft < 0) /* killed */ + pr_warn_client(cl, "umount was killed, %ld\n", timeleft); + } + spin_lock(&mdsc->stopping_lock); mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHING; wait = !!atomic_read(&mdsc->stopping_blockers);