From b88b249ba708a36b976bc1635b0b8a3556f1691d Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:31 -0400 Subject: [PATCH 01/17] dlm: remove scand leftovers This patch removes some leftover related code from dlm_scand that was dropped in commit b1f2381c1a8d ("dlm: drop dlm_scand kthread and use timers"). Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 8 -------- fs/dlm/lockspace.c | 1 - 2 files changed, 9 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 9085ba3b2f20..9618ce0720d9 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -559,13 +559,6 @@ struct rcom_lock { char rl_lvb[]; }; -/* - * The max number of resources per rsbtbl bucket that shrink will attempt - * to remove in each iteration. - */ - -#define DLM_REMOVE_NAMES_MAX 8 - struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ dlm_lockspace_t *ls_local_handle; @@ -578,7 +571,6 @@ struct dlm_ls { wait_queue_head_t ls_count_wait; int ls_create_count; /* create/release refcount */ unsigned long ls_flags; /* LSFL_ */ - unsigned long ls_scan_time; struct kobject ls_kobj; struct idr ls_lkbidr; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 475ab4370dda..b6a1a6eb7f27 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -410,7 +410,6 @@ static int new_lockspace(const char *name, const char *cluster, atomic_set(&ls->ls_count, 0); init_waitqueue_head(&ls->ls_count_wait); ls->ls_flags = 0; - ls->ls_scan_time = jiffies; if (ops && dlm_config.ci_recover_callbacks) { ls->ls_ops = ops; From a2155402bf0e03a3cd2ba21a6a0d82426379d8e0 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:32 -0400 Subject: [PATCH 02/17] dlm: don't kref_init rsbs created for toss list This patch removes a kref_init() that isn't necessary because the rsb is created for toss list. Under toss list the rsb should not have any reference counting logic. If in theory the rsb gets to into keep list then a kref_init() for res_ref will be initiated. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 1 - 1 file changed, 1 deletion(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f103b8c30592..e66972ed97b1 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1385,7 +1385,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - kref_init(&r->res_ref); rsb_set_flag(r, RSB_TOSS); write_lock_bh(&ls->ls_rsbtbl_lock); From f49da8c09f93ad2b220ee44091123aa9693eefde Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:33 -0400 Subject: [PATCH 03/17] dlm: remove unused parameter in dlm_midcomms_addr This patch removes an parameter which is currently not used by dlm_midcomms_addr(). Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 2 +- fs/dlm/lowcomms.c | 2 +- fs/dlm/lowcomms.h | 2 +- fs/dlm/midcomms.c | 4 ++-- fs/dlm/midcomms.h | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 517fa975dc5a..99952234799e 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -672,7 +672,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf, memcpy(addr, buf, len); - rv = dlm_midcomms_addr(cm->nodeid, addr, len); + rv = dlm_midcomms_addr(cm->nodeid, addr); if (rv) { kfree(addr); return rv; diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 6b8078085e56..591385701c7d 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -461,7 +461,7 @@ static bool dlm_lowcomms_con_has_addr(const struct connection *con, return false; } -int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) { struct connection *con; bool ret, idx; diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index 8deb16f8f620..fd0df604eb93 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -46,7 +46,7 @@ void dlm_lowcomms_put_msg(struct dlm_msg *msg); int dlm_lowcomms_resend_msg(struct dlm_msg *msg); int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); -int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr); void dlm_midcomms_receive_done(int nodeid); struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void); struct kmem_cache *dlm_lowcomms_msg_cache_create(void); diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index c34f38e9ee5c..2c101bbe261a 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -334,12 +334,12 @@ static struct midcomms_node *nodeid2node(int nodeid) return __find_node(nodeid, nodeid_hash(nodeid)); } -int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr) { int ret, idx, r = nodeid_hash(nodeid); struct midcomms_node *node; - ret = dlm_lowcomms_addr(nodeid, addr, len); + ret = dlm_lowcomms_addr(nodeid, addr); if (ret) return ret; diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h index 278d26fdeb2c..7fad1d170bba 100644 --- a/fs/dlm/midcomms.h +++ b/fs/dlm/midcomms.h @@ -19,7 +19,7 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen); struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc); void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name, int namelen); -int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr); void dlm_midcomms_version_wait(void); int dlm_midcomms_close(int nodeid); int dlm_midcomms_start(void); From 4db41bf4f04f75d5bcf52c500cbec11a2e159a06 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:34 -0400 Subject: [PATCH 04/17] dlm: remove ls_local_handle from struct dlm_ls This patch removes ls_local_handle from struct dlm_ls as it stores the ls pointer of the top level structure itesef and this isn't necessary. There is a lookup functionality to lookup the lockspace in dlm_find_lockspace_local() but the given input parameter is the pointer already. This might be more safe to lookup a lockspace but given a wrong lockspace pointer is a bug in the code and we save the additional lookup here. The dlm_ls structure can be still hidden by using dlm_lockspace_t handle pointer. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 1 - fs/dlm/lockspace.c | 16 +++------------- fs/dlm/user.c | 4 ++-- 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 9618ce0720d9..e93ed8f7addd 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -561,7 +561,6 @@ struct rcom_lock { struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ - dlm_lockspace_t *ls_local_handle; uint32_t ls_global_id; /* global unique lockspace ID */ uint32_t ls_generation; uint32_t ls_exflags; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index b6a1a6eb7f27..8155d7475c79 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -38,7 +38,7 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) if (rc) return rc; - ls = dlm_find_lockspace_local(ls->ls_local_handle); + ls = dlm_find_lockspace_local(ls); if (!ls) return -EINVAL; @@ -265,18 +265,9 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id) struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) { - struct dlm_ls *ls; + struct dlm_ls *ls = lockspace; - spin_lock_bh(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (ls->ls_local_handle == lockspace) { - atomic_inc(&ls->ls_count); - goto out; - } - } - ls = NULL; - out: - spin_unlock_bh(&lslist_lock); + atomic_inc(&ls->ls_count); return ls; } @@ -496,7 +487,6 @@ static int new_lockspace(const char *name, const char *cluster, idr_init(&ls->ls_recover_idr); spin_lock_init(&ls->ls_recover_idr_lock); ls->ls_recover_list_count = 0; - ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); INIT_LIST_HEAD(&ls->ls_masters_list); rwlock_init(&ls->ls_masters_lock); diff --git a/fs/dlm/user.c b/fs/dlm/user.c index 3173b974e8c8..f6635a5314f4 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -454,7 +454,7 @@ static int device_remove_lockspace(struct dlm_lspace_params *params) if (params->flags & DLM_USER_LSFLG_FORCEFREE) force = 2; - lockspace = ls->ls_local_handle; + lockspace = ls; dlm_put_lockspace(ls); /* The final dlm_release_lockspace waits for references to go to @@ -657,7 +657,7 @@ static int device_open(struct inode *inode, struct file *file) return -ENOMEM; } - proc->lockspace = ls->ls_local_handle; + proc->lockspace = ls; INIT_LIST_HEAD(&proc->asts); INIT_LIST_HEAD(&proc->locks); INIT_LIST_HEAD(&proc->unlocking); From 1ffefc19c4ac7c61e5acb29c7a915ce494fe448c Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:36 -0400 Subject: [PATCH 05/17] dlm: drop own rsb pre allocation mechanism This patch drops the own written rsb pre allocation mechanism as this is already done by using kmem caches, we don't need another layer on top of that to running some pre allocation scheme. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 9 +---- fs/dlm/lock.c | 92 ++++++------------------------------------- fs/dlm/lockspace.c | 11 ------ 3 files changed, 13 insertions(+), 99 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index e93ed8f7addd..61dc58bdd006 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -322,10 +322,7 @@ struct dlm_rsb { unsigned long res_toss_time; uint32_t res_first_lkid; struct list_head res_lookup; /* lkbs waiting on first */ - union { - struct list_head res_hashchain; - struct rhash_head res_node; /* rsbtbl */ - }; + struct rhash_head res_node; /* rsbtbl */ struct list_head res_grantqueue; struct list_head res_convertqueue; struct list_head res_waitqueue; @@ -596,10 +593,6 @@ struct dlm_ls { spinlock_t ls_orphans_lock; struct list_head ls_orphans; - spinlock_t ls_new_rsb_spin; - int ls_new_rsb_count; - struct list_head ls_new_rsb; /* new rsb structs */ - struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index e66972ed97b1..5ecc50a001d9 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -389,38 +389,6 @@ void dlm_put_rsb(struct dlm_rsb *r) put_rsb(r); } -static int pre_rsb_struct(struct dlm_ls *ls) -{ - struct dlm_rsb *r1, *r2; - int count = 0; - - spin_lock_bh(&ls->ls_new_rsb_spin); - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { - spin_unlock_bh(&ls->ls_new_rsb_spin); - return 0; - } - spin_unlock_bh(&ls->ls_new_rsb_spin); - - r1 = dlm_allocate_rsb(ls); - r2 = dlm_allocate_rsb(ls); - - spin_lock_bh(&ls->ls_new_rsb_spin); - if (r1) { - list_add(&r1->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; - } - if (r2) { - list_add(&r2->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; - } - count = ls->ls_new_rsb_count; - spin_unlock_bh(&ls->ls_new_rsb_spin); - - if (!count) - return -ENOMEM; - return 0; -} - /* connected with timer_delete_sync() in dlm_ls_stop() to stop * new timers when recovery is triggered and don't run them * again until a dlm_timer_resume() tries it again. @@ -652,22 +620,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, struct dlm_rsb **r_ret) { struct dlm_rsb *r; - int count; - spin_lock_bh(&ls->ls_new_rsb_spin); - if (list_empty(&ls->ls_new_rsb)) { - count = ls->ls_new_rsb_count; - spin_unlock_bh(&ls->ls_new_rsb_spin); - log_debug(ls, "find_rsb retry %d %d %s", - count, dlm_config.ci_new_rsb_count, - (const char *)name); - return -EAGAIN; - } - - r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); - list_del(&r->res_hashchain); - ls->ls_new_rsb_count--; - spin_unlock_bh(&ls->ls_new_rsb_spin); + r = dlm_allocate_rsb(ls); + if (!r) + return -ENOMEM; r->res_ls = ls; r->res_length = len; @@ -792,13 +748,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } retry: - if (create) { - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - } - - retry_lookup: /* check if the rsb is in keep state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); @@ -832,7 +781,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, if (!error) { if (!rsb_flag(r, RSB_TOSS)) { write_unlock_bh(&ls->ls_rsbtbl_lock); - goto retry_lookup; + goto retry; } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -898,9 +847,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, goto out; error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) - goto retry; - if (error) + if (WARN_ON_ONCE(error)) goto out; r->res_hash = hash; @@ -952,7 +899,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, */ write_unlock_bh(&ls->ls_rsbtbl_lock); dlm_free_rsb(r); - goto retry_lookup; + goto retry; } else if (!error) { list_add(&r->res_rsbs_list, &ls->ls_keep); } @@ -976,11 +923,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, int error; retry: - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - - retry_lookup: /* check if the rsb is in keep state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); @@ -1015,7 +957,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, if (!error) { if (!rsb_flag(r, RSB_TOSS)) { write_unlock_bh(&ls->ls_rsbtbl_lock); - goto retry_lookup; + goto retry; } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1070,10 +1012,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - goto retry; - } - if (error) + if (WARN_ON_ONCE(error)) goto out; r->res_hash = hash; @@ -1090,7 +1029,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ write_unlock_bh(&ls->ls_rsbtbl_lock); dlm_free_rsb(r); - goto retry_lookup; + goto retry; } else if (!error) { list_add(&r->res_rsbs_list, &ls->ls_keep); } @@ -1304,11 +1243,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } retry: - error = pre_rsb_struct(ls); - if (error < 0) - return error; - - retry_lookup: /* check if the rsb is in keep state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); @@ -1354,7 +1288,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, /* something as changed, very unlikely but * try again */ - goto retry_lookup; + goto retry; } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1376,9 +1310,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, not_found: error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) - goto retry; - if (error) + if (WARN_ON_ONCE(error)) goto out; r->res_hash = hash; @@ -1395,7 +1327,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, */ write_unlock_bh(&ls->ls_rsbtbl_lock); dlm_free_rsb(r); - goto retry_lookup; + goto retry; } else if (error) { write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */ diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 8155d7475c79..b96f2d05992d 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -428,9 +428,6 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_orphans); spin_lock_init(&ls->ls_orphans_lock); - INIT_LIST_HEAD(&ls->ls_new_rsb); - spin_lock_init(&ls->ls_new_rsb_spin); - INIT_LIST_HEAD(&ls->ls_nodes); INIT_LIST_HEAD(&ls->ls_nodes_gone); ls->ls_num_nodes = 0; @@ -688,7 +685,6 @@ static void rhash_free_rsb(void *ptr, void *arg) static int release_lockspace(struct dlm_ls *ls, int force) { - struct dlm_rsb *rsb; int busy, rv; busy = lockspace_busy(ls, force); @@ -756,13 +752,6 @@ static int release_lockspace(struct dlm_ls *ls, int force) */ rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); - while (!list_empty(&ls->ls_new_rsb)) { - rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, - res_hashchain); - list_del(&rsb->res_hashchain); - dlm_free_rsb(rsb); - } - /* * Free structures on any other lists */ From f455eb8490acab680ddee79613e511e18a59c8b1 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:38 -0400 Subject: [PATCH 06/17] dlm: move lkb idr to xarray datastructure According to kernel doc idr is deprecated and xarrays should be used nowadays. This patch is moving the lkb idr implementation to xarrays. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 5 ++-- fs/dlm/lock.c | 30 ++++++++++++---------- fs/dlm/lockspace.c | 60 +++++++++++++++++++++---------------------- 3 files changed, 49 insertions(+), 46 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 61dc58bdd006..015f2b2a83f6 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -569,8 +570,8 @@ struct dlm_ls { unsigned long ls_flags; /* LSFL_ */ struct kobject ls_kobj; - struct idr ls_lkbidr; - rwlock_t ls_lkbidr_lock; + struct xarray ls_lkbxa; + rwlock_t ls_lkbxa_lock; struct rhashtable ls_rsbtbl; rwlock_t ls_rsbtbl_lock; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 5ecc50a001d9..a29de48849ef 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1435,11 +1435,15 @@ static void detach_lkb(struct dlm_lkb *lkb) } static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, - int start, int end) + unsigned long start, unsigned long end) { + struct xa_limit limit; struct dlm_lkb *lkb; int rv; + limit.max = end; + limit.min = start; + lkb = dlm_allocate_lkb(ls); if (!lkb) return -ENOMEM; @@ -1453,14 +1457,12 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); - write_lock_bh(&ls->ls_lkbidr_lock); - rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); - if (rv >= 0) - lkb->lkb_id = rv; - write_unlock_bh(&ls->ls_lkbidr_lock); + write_lock_bh(&ls->ls_lkbxa_lock); + rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC); + write_unlock_bh(&ls->ls_lkbxa_lock); if (rv < 0) { - log_error(ls, "create_lkb idr error %d", rv); + log_error(ls, "create_lkb xa error %d", rv); dlm_free_lkb(lkb); return rv; } @@ -1471,18 +1473,18 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { - return _create_lkb(ls, lkb_ret, 1, 0); + return _create_lkb(ls, lkb_ret, 1, ULONG_MAX); } static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - read_lock_bh(&ls->ls_lkbidr_lock); - lkb = idr_find(&ls->ls_lkbidr, lkid); + read_lock_bh(&ls->ls_lkbxa_lock); + lkb = xa_load(&ls->ls_lkbxa, lkid); if (lkb) kref_get(&lkb->lkb_ref); - read_unlock_bh(&ls->ls_lkbidr_lock); + read_unlock_bh(&ls->ls_lkbxa_lock); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; @@ -1507,10 +1509,10 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) int rv; rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, - &ls->ls_lkbidr_lock); + &ls->ls_lkbxa_lock); if (rv) { - idr_remove(&ls->ls_lkbidr, lkid); - write_unlock_bh(&ls->ls_lkbidr_lock); + xa_erase(&ls->ls_lkbxa, lkid); + write_unlock_bh(&ls->ls_lkbxa_lock); detach_lkb(lkb); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index b96f2d05992d..e480dd4cd958 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -420,8 +420,8 @@ static int new_lockspace(const char *name, const char *cluster, if (error) goto out_lsfree; - idr_init(&ls->ls_lkbidr); - rwlock_init(&ls->ls_lkbidr_lock); + xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); + rwlock_init(&ls->ls_lkbxa_lock); INIT_LIST_HEAD(&ls->ls_waiters); spin_lock_init(&ls->ls_waiters_lock); @@ -471,7 +471,7 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); if (!ls->ls_recover_buf) { error = -ENOMEM; - goto out_lkbidr; + goto out_lkbxa; } ls->ls_slot = 0; @@ -572,8 +572,8 @@ static int new_lockspace(const char *name, const char *cluster, spin_unlock_bh(&lslist_lock); idr_destroy(&ls->ls_recover_idr); kfree(ls->ls_recover_buf); - out_lkbidr: - idr_destroy(&ls->ls_lkbidr); + out_lkbxa: + xa_destroy(&ls->ls_lkbxa); rhashtable_destroy(&ls->ls_rsbtbl); out_lsfree: if (do_unreg) @@ -633,22 +633,8 @@ int dlm_new_user_lockspace(const char *name, const char *cluster, ops_arg, ops_result, lockspace); } -static int lkb_idr_is_local(int id, void *p, void *data) +static int lkb_idr_free(struct dlm_lkb *lkb) { - struct dlm_lkb *lkb = p; - - return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; -} - -static int lkb_idr_is_any(int id, void *p, void *data) -{ - return 1; -} - -static int lkb_idr_free(int id, void *p, void *data) -{ - struct dlm_lkb *lkb = p; - if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) dlm_free_lvb(lkb->lkb_lvbptr); @@ -656,23 +642,34 @@ static int lkb_idr_free(int id, void *p, void *data) return 0; } -/* NOTE: We check the lkbidr here rather than the resource table. +/* NOTE: We check the lkbxa here rather than the resource table. This is because there may be LKBs queued as ASTs that have been unlinked from their RSBs and are pending deletion once the AST has been delivered */ static int lockspace_busy(struct dlm_ls *ls, int force) { - int rv; + struct dlm_lkb *lkb; + unsigned long id; + int rv = 0; - read_lock_bh(&ls->ls_lkbidr_lock); + read_lock_bh(&ls->ls_lkbxa_lock); if (force == 0) { - rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); + xa_for_each(&ls->ls_lkbxa, id, lkb) { + rv = 1; + break; + } } else if (force == 1) { - rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); + xa_for_each(&ls->ls_lkbxa, id, lkb) { + if (lkb->lkb_nodeid == 0 && + lkb->lkb_grmode != DLM_LOCK_IV) { + rv = 1; + break; + } + } } else { rv = 0; } - read_unlock_bh(&ls->ls_lkbidr_lock); + read_unlock_bh(&ls->ls_lkbxa_lock); return rv; } @@ -685,6 +682,8 @@ static void rhash_free_rsb(void *ptr, void *arg) static int release_lockspace(struct dlm_ls *ls, int force) { + struct dlm_lkb *lkb; + unsigned long id; int busy, rv; busy = lockspace_busy(ls, force); @@ -741,11 +740,12 @@ static int release_lockspace(struct dlm_ls *ls, int force) kfree(ls->ls_recover_buf); /* - * Free all lkb's in idr + * Free all lkb's in xa */ - - idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); - idr_destroy(&ls->ls_lkbidr); + xa_for_each(&ls->ls_lkbxa, id, lkb) { + lkb_idr_free(lkb); + } + xa_destroy(&ls->ls_lkbxa); /* * Free all rsb's on rsbtbl From fa0b54f17afe5c7449b1f0de3eb8a372f637ed30 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 28 May 2024 17:12:39 -0400 Subject: [PATCH 07/17] dlm: move recover idr to xarray datastructure According to kdoc idr is deprecated and xarrays should be used nowadays. This patch is moving the recover idr implementation to xarray datastructure. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 7 +++-- fs/dlm/lockspace.c | 8 +++--- fs/dlm/recover.c | 61 +++++++++++++++++++++++-------------------- 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 015f2b2a83f6..9e68e68bf0cf 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -36,7 +36,6 @@ #include #include #include -#include #include #include #include @@ -317,7 +316,7 @@ struct dlm_rsb { int res_nodeid; int res_master_nodeid; int res_dir_nodeid; - int res_id; /* for ls_recover_idr */ + unsigned long res_id; /* for ls_recover_xa */ uint32_t res_lvbseq; uint32_t res_hash; unsigned long res_toss_time; @@ -649,8 +648,8 @@ struct dlm_ls { struct list_head ls_recover_list; spinlock_t ls_recover_list_lock; int ls_recover_list_count; - struct idr ls_recover_idr; - spinlock_t ls_recover_idr_lock; + struct xarray ls_recover_xa; + spinlock_t ls_recover_xa_lock; wait_queue_head_t ls_wait_general; wait_queue_head_t ls_recover_lock_wait; spinlock_t ls_clear_proc_locks; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index e480dd4cd958..6f1078a1c715 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -481,8 +481,8 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); - idr_init(&ls->ls_recover_idr); - spin_lock_init(&ls->ls_recover_idr_lock); + xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); + spin_lock_init(&ls->ls_recover_xa_lock); ls->ls_recover_list_count = 0; init_waitqueue_head(&ls->ls_wait_general); INIT_LIST_HEAD(&ls->ls_masters_list); @@ -570,7 +570,7 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock_bh(&lslist_lock); list_del(&ls->ls_list); spin_unlock_bh(&lslist_lock); - idr_destroy(&ls->ls_recover_idr); + xa_destroy(&ls->ls_recover_xa); kfree(ls->ls_recover_buf); out_lkbxa: xa_destroy(&ls->ls_lkbxa); @@ -736,7 +736,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_delete_debug_file(ls); - idr_destroy(&ls->ls_recover_idr); + xa_destroy(&ls->ls_recover_xa); kfree(ls->ls_recover_buf); /* diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index f493d5f30c58..d156196b9e69 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -293,73 +293,78 @@ static void recover_list_clear(struct dlm_ls *ls) spin_unlock_bh(&ls->ls_recover_list_lock); } -static int recover_idr_empty(struct dlm_ls *ls) +static int recover_xa_empty(struct dlm_ls *ls) { int empty = 1; - spin_lock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); if (ls->ls_recover_list_count) empty = 0; - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); return empty; } -static int recover_idr_add(struct dlm_rsb *r) +static int recover_xa_add(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; + struct xa_limit limit = { + .min = 1, + .max = UINT_MAX, + }; + uint32_t id; int rv; - spin_lock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); if (r->res_id) { rv = -1; goto out_unlock; } - rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); + rv = xa_alloc(&ls->ls_recover_xa, &id, r, limit, GFP_ATOMIC); if (rv < 0) goto out_unlock; - r->res_id = rv; + r->res_id = id; ls->ls_recover_list_count++; dlm_hold_rsb(r); rv = 0; out_unlock: - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); return rv; } -static void recover_idr_del(struct dlm_rsb *r) +static void recover_xa_del(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - spin_lock_bh(&ls->ls_recover_idr_lock); - idr_remove(&ls->ls_recover_idr, r->res_id); + spin_lock_bh(&ls->ls_recover_xa_lock); + xa_erase_bh(&ls->ls_recover_xa, r->res_id); r->res_id = 0; ls->ls_recover_list_count--; - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); dlm_put_rsb(r); } -static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) +static struct dlm_rsb *recover_xa_find(struct dlm_ls *ls, uint64_t id) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_recover_idr_lock); - r = idr_find(&ls->ls_recover_idr, (int)id); - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); + r = xa_load(&ls->ls_recover_xa, (int)id); + spin_unlock_bh(&ls->ls_recover_xa_lock); return r; } -static void recover_idr_clear(struct dlm_ls *ls) +static void recover_xa_clear(struct dlm_ls *ls) { struct dlm_rsb *r; - int id; + unsigned long id; - spin_lock_bh(&ls->ls_recover_idr_lock); + spin_lock_bh(&ls->ls_recover_xa_lock); - idr_for_each_entry(&ls->ls_recover_idr, r, id) { - idr_remove(&ls->ls_recover_idr, id); + xa_for_each(&ls->ls_recover_xa, id, r) { + xa_erase_bh(&ls->ls_recover_xa, id); r->res_id = 0; r->res_recover_locks_count = 0; ls->ls_recover_list_count--; @@ -372,7 +377,7 @@ static void recover_idr_clear(struct dlm_ls *ls) ls->ls_recover_list_count); ls->ls_recover_list_count = 0; } - spin_unlock_bh(&ls->ls_recover_idr_lock); + spin_unlock_bh(&ls->ls_recover_xa_lock); } @@ -470,7 +475,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) set_new_master(r); error = 0; } else { - recover_idr_add(r); + recover_xa_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid, seq); } @@ -551,10 +556,10 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq, log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); - error = dlm_wait_function(ls, &recover_idr_empty); + error = dlm_wait_function(ls, &recover_xa_empty); out: if (error) - recover_idr_clear(ls); + recover_xa_clear(ls); return error; } @@ -563,7 +568,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) struct dlm_rsb *r; int ret_nodeid, new_master; - r = recover_idr_find(ls, le64_to_cpu(rc->rc_id)); + r = recover_xa_find(ls, le64_to_cpu(rc->rc_id)); if (!r) { log_error(ls, "dlm_recover_master_reply no id %llx", (unsigned long long)le64_to_cpu(rc->rc_id)); @@ -582,9 +587,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) r->res_nodeid = new_master; set_new_master(r); unlock_rsb(r); - recover_idr_del(r); + recover_xa_del(r); - if (recover_idr_empty(ls)) + if (recover_xa_empty(ls)) wake_up(&ls->ls_wait_general); out: return 0; From 4f5957a980d023405eb45bd31258fc8488a3acb1 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Mon, 10 Jun 2024 15:02:31 -0500 Subject: [PATCH 08/17] dlm: change list and timer names The old terminology of "toss" and "keep" is no longer an accurate description of the rsb states and lists, so change the names to "inactive" and "active". The old names had also been copied into the scanning code, which is changed back to use the "scan" name. - "active" rsb structs have lkb's attached, and are ref counted. - "inactive" rsb structs have no lkb's attached, are not ref counted. - "scan" list is for rsb's that can be freed after a timeout period. - "slow" lists are for infrequent iterations through active or inactive rsb structs. - inactive rsb structs that are directory records will not be put on the scan list, since they are not freed based on timeouts. - inactive rsb structs that are not directory records will be put on the scan list to be freed, since they are not longer needed. Signed-off-by: David Teigland --- fs/dlm/debug_fs.c | 10 +- fs/dlm/dlm_internal.h | 26 ++-- fs/dlm/lock.c | 308 +++++++++++++++++++----------------------- fs/dlm/lock.h | 7 +- fs/dlm/lockspace.c | 13 +- fs/dlm/member.c | 2 +- fs/dlm/recover.c | 17 +-- fs/dlm/recover.h | 2 +- fs/dlm/recoverd.c | 14 +- 9 files changed, 181 insertions(+), 218 deletions(-) diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 6ab3ed4074c6..7112958c2e5b 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -380,7 +380,7 @@ static const struct seq_operations format4_seq_ops; static int table_seq_show(struct seq_file *seq, void *iter_ptr) { - struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list); + struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_slow_list); if (seq->op == &format1_seq_ops) print_format1(rsb, seq); @@ -409,9 +409,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) } if (seq->op == &format4_seq_ops) - list = &ls->ls_toss; + list = &ls->ls_slow_inactive; else - list = &ls->ls_keep; + list = &ls->ls_slow_active; read_lock_bh(&ls->ls_rsbtbl_lock); return seq_list_start(list, *pos); @@ -423,9 +423,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) struct list_head *list; if (seq->op == &format4_seq_ops) - list = &ls->ls_toss; + list = &ls->ls_slow_inactive; else - list = &ls->ls_keep; + list = &ls->ls_slow_active; return seq_list_next(iter_ptr, list, pos); } diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 9e68e68bf0cf..818484315906 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -327,11 +327,11 @@ struct dlm_rsb { struct list_head res_convertqueue; struct list_head res_waitqueue; - struct list_head res_rsbs_list; + struct list_head res_slow_list; /* ls_slow_* */ + struct list_head res_scan_list; struct list_head res_root_list; /* used for recovery */ struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ - struct list_head res_toss_q_list; int res_recover_locks_count; char *res_lvbptr; @@ -365,7 +365,7 @@ enum rsb_flags { RSB_RECOVER_CONVERT, RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, - RSB_TOSS, + RSB_INACTIVE, }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -572,20 +572,16 @@ struct dlm_ls { struct xarray ls_lkbxa; rwlock_t ls_lkbxa_lock; + /* an rsb is on rsbtl for primary locking functions, + and on a slow list for recovery/dump iteration */ struct rhashtable ls_rsbtbl; - rwlock_t ls_rsbtbl_lock; + rwlock_t ls_rsbtbl_lock; /* for ls_rsbtbl and ls_slow */ + struct list_head ls_slow_inactive; /* to iterate rsbtbl */ + struct list_head ls_slow_active; /* to iterate rsbtbl */ - struct list_head ls_toss; - struct list_head ls_keep; - - struct timer_list ls_timer; - /* this queue is ordered according the - * absolute res_toss_time jiffies time - * to mod_timer() with the first element - * if necessary. - */ - struct list_head ls_toss_q; - spinlock_t ls_toss_q_lock; + struct timer_list ls_scan_timer; /* based on first scan_list rsb toss_time */ + struct list_head ls_scan_list; /* rsbs ordered by res_toss_time */ + spinlock_t ls_scan_lock; spinlock_t ls_waiters_lock; struct list_head ls_waiters; /* lkbs needing a reply */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index a29de48849ef..f5f2ceab5a04 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, const struct dlm_message *ms, bool local); static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -330,8 +330,8 @@ static inline unsigned long rsb_toss_jiffies(void) static inline void hold_rsb(struct dlm_rsb *r) { - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -370,15 +370,12 @@ static inline int dlm_kref_put_write_lock_bh(struct kref *kref, return 0; } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ - static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; int rv; - rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, &ls->ls_rsbtbl_lock); if (rv) write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -391,48 +388,49 @@ void dlm_put_rsb(struct dlm_rsb *r) /* connected with timer_delete_sync() in dlm_ls_stop() to stop * new timers when recovery is triggered and don't run them - * again until a dlm_timer_resume() tries it again. + * again until a resume_scan_timer() tries it again. */ -static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { if (!dlm_locking_stopped(ls)) - mod_timer(&ls->ls_timer, jiffies); + mod_timer(&ls->ls_scan_timer, jiffies); } /* This function tries to resume the timer callback if a rsb - * is on the toss list and no timer is pending. It might that + * is on the scan list and no timer is pending. It might that * the first entry is on currently executed as timer callback * but we don't care if a timer queued up again and does * nothing. Should be a rare case. */ -void dlm_timer_resume(struct dlm_ls *ls) +void resume_scan_timer(struct dlm_ls *ls) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_toss_q_lock); - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - if (r && !timer_pending(&ls->ls_timer)) - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); } -/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ -static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) { struct dlm_rsb *first; - spin_lock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); r->res_toss_time = 0; /* if the rsb is not queued do nothing */ - if (list_empty(&r->res_toss_q_list)) + if (list_empty(&r->res_scan_list)) goto out; /* get the first element before delete */ - first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_del_init(&r->res_toss_q_list); + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); /* check if the first element was the rsb we deleted */ if (first == r) { /* try to get the new first element, if the list @@ -442,23 +440,19 @@ static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) * if the list isn't empty and a new first element got * in place, set the new timer expire time. */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!first) - timer_delete(&ls->ls_timer); + timer_delete(&ls->ls_scan_timer); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } out: - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } -/* Caller must held ls_rsbtbl_lock and need to be called every time - * when either the rsb enters toss state or the toss state changes - * the dir/master nodeid. - */ -static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) { int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *first; @@ -471,25 +465,25 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) if (!dlm_no_directory(ls) && (r->res_master_nodeid != our_nodeid) && (dlm_dir_nodeid(r) == our_nodeid)) { - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); return; } - spin_lock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); /* set the new rsb absolute expire time in the rsb */ r->res_toss_time = rsb_toss_jiffies(); - if (list_empty(&ls->ls_toss_q)) { + if (list_empty(&ls->ls_scan_list)) { /* if the queue is empty add the element and it's * our new expire time */ - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); - __rsb_mod_timer(ls, r->res_toss_time); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); } else { /* check if the rsb was already queued, if so delete * it from the toss queue */ - if (!list_empty(&r->res_toss_q_list)) - list_del(&r->res_toss_q_list); + if (!list_empty(&r->res_scan_list)) + list_del(&r->res_scan_list); /* try to get the maybe new first element and then add * to this rsb with the oldest expire time to the end @@ -497,15 +491,15 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) * rsb expire time is our next expiration if it wasn't * the now new first elemet is our new expiration time */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); if (!first) - __rsb_mod_timer(ls, r->res_toss_time); + enable_scan_timer(ls, r->res_toss_time); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } /* if we hit contention we do in 250 ms a retry to trylock. @@ -515,9 +509,11 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) */ #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) -void dlm_rsb_toss_timer(struct timer_list *timer) +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) { - struct dlm_ls *ls = from_timer(ls, timer, ls_timer); + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *r; int rv; @@ -525,76 +521,62 @@ void dlm_rsb_toss_timer(struct timer_list *timer) while (1) { /* interrupting point to leave iteration when * recovery waits for timer_delete_sync(), recovery - * will take care to delete everything in toss queue. + * will take care to delete everything in scan list. */ if (dlm_locking_stopped(ls)) break; - rv = spin_trylock(&ls->ls_toss_q_lock); + rv = spin_trylock(&ls->ls_scan_lock); if (!rv) { /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!r) { - /* nothing to do anymore next rsb queue will - * set next mod_timer() expire. - */ - spin_unlock(&ls->ls_toss_q_lock); + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); break; } - /* test if the first rsb isn't expired yet, if - * so we stop freeing rsb from toss queue as - * the order in queue is ascending to the - * absolute res_toss_time jiffies + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. */ if (time_before(jiffies, r->res_toss_time)) { /* rearm with the next rsb to expire in the future */ - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock(&ls->ls_toss_q_lock); + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); break; } /* in find_rsb_dir/nodir there is a reverse order of this * lock, however this is only a trylock if we hit some * possible contention we try it again. - * - * This lock synchronized while holding ls_toss_q_lock - * synchronize everything that rsb_delete_toss_timer() - * or rsb_mod_timer() can't run after this timer callback - * deletes the rsb from the ls_toss_q. Whereas the other - * holders have always a priority to run as this is only - * a caching handling and the other holders might to put - * this rsb out of the toss state. */ rv = write_trylock(&ls->ls_rsbtbl_lock); if (!rv) { - spin_unlock(&ls->ls_toss_q_lock); + spin_unlock(&ls->ls_scan_lock); /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); - /* not necessary to held the ls_rsbtbl_lock when - * calling send_remove() - */ + /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); - /* remove the rsb out of the toss queue its gone - * drom DLM now - */ - list_del_init(&r->res_toss_q_list); - spin_unlock(&ls->ls_toss_q_lock); + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); - /* no rsb in this state should ever run a timer */ + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ WARN_ON(!dlm_no_directory(ls) && (r->res_master_nodeid != our_nodeid) && (dlm_dir_nodeid(r) == our_nodeid)); @@ -608,7 +590,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer) (dlm_dir_nodeid(r) != our_nodeid)) send_remove(r); - free_toss_rsb(r); + free_inactive_rsb(r); } } @@ -635,7 +617,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); - INIT_LIST_HEAD(&r->res_toss_q_list); + INIT_LIST_HEAD(&r->res_scan_list); INIT_LIST_HEAD(&r->res_recover_list); INIT_LIST_HEAD(&r->res_masters_list); @@ -689,7 +671,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * - * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * @@ -737,9 +719,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if - * someone sends us a request after we have removed/freed an rsb - * from our toss list. (They sent a request instead of lookup - * because they are using an rsb from their toss list.) + * someone sends us a request after we have removed/freed an rsb. + * (They sent a request instead of lookup because they are using + * an rsb taken from their scan list.) */ if (from_local || from_dir || @@ -749,7 +731,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, retry: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) { @@ -761,9 +743,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * rsb is active, so we can't check master_nodeid without lock_rsb. */ - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } kref_get(&r->res_ref); @@ -771,15 +753,15 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, goto out; - do_toss: + do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in toss state - * if not it's in keep state and we relookup - unlikely path. + /* retry lookup under write lock to see if its still in inactive state + * if not it's in active state and we relookup - unlikely path. */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; } @@ -791,14 +773,14 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread - * is using this rsb because it's on the toss list, so we can + * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -808,7 +790,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ - log_error(ls, "find_rsb toss from_dir %d master %d", + log_error(ls, "find_rsb inactive from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ @@ -825,14 +807,10 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - list_move(&r->res_rsbs_list, &ls->ls_keep); - rsb_clear_flag(r, RSB_TOSS); - /* rsb got out of toss state, it becomes alive again - * and we reinit the reference counter that is only - * valid for keep state rsbs - */ + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -901,7 +879,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); goto retry; } else if (!error) { - list_add(&r->res_rsbs_list, &ls->ls_keep); + list_add(&r->res_slow_list, &ls->ls_slow_active); } write_unlock_bh(&ls->ls_rsbtbl_lock); out: @@ -924,7 +902,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, retry: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is in active state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) { @@ -932,9 +910,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, goto do_new; } - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } /* @@ -947,15 +925,15 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, goto out; - do_toss: + do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in toss state - * if not it's in keep state and we relookup - unlikely path. + /* retry lookup under write lock to see if its still inactive. + * if it's active, repeat lookup - unlikely path. */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; } @@ -967,14 +945,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, /* * rsb found inactive. No other thread is using this rsb because - * it's on the toss list, so we can look at or update - * res_master_nodeid without lock_rsb. + * it's inactive, so we can look at or update res_master_nodeid + * without lock_rsb. */ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a request; this should never happen */ - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -986,21 +964,17 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, (dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it; this should never happen */ - log_error(ls, "find_rsb toss our %d master %d dir %d", + log_error(ls, "find_rsb inactive our %d master %d dir %d", our_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; } - list_move(&r->res_rsbs_list, &ls->ls_keep); - rsb_clear_flag(r, RSB_TOSS); - /* rsb got out of toss state, it becomes alive again - * and we reinit the reference counter that is only - * valid for keep state rsbs - */ + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -1031,7 +1005,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); goto retry; } else if (!error) { - list_add(&r->res_rsbs_list, &ls->ls_keep); + list_add(&r->res_slow_list, &ls->ls_slow_active); } write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1105,7 +1079,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, - int from_nodeid, bool toss_list, unsigned int flags, + int from_nodeid, bool is_inactive, unsigned int flags, int *r_nodeid, int *result) { int fix_master = (flags & DLM_LU_RECOVER_MASTER); @@ -1129,9 +1103,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_nodeid = from_nodeid; rsb_set_flag(r, RSB_NEW_MASTER); - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "%s fix_master on toss", __func__); + if (is_inactive) { + /* I don't think we should ever find it inactive. */ + log_error(ls, "%s fix_master inactive", __func__); dlm_dump_rsb(r); } } @@ -1171,7 +1145,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no if (!from_master && !fix_master && (r->res_master_nodeid == from_nodeid)) { /* this can happen when the master sends remove, the dir node - * finds the rsb on the keep list and ignores the remove, + * finds the rsb on the active list and ignores the remove, * and the former master sends a lookup */ @@ -1244,13 +1218,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, retry: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } /* because the rsb is active, we need to lock_rsb before @@ -1274,16 +1248,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto not_found; } - do_toss: + do_inactive: /* unlikely path - relookup under write */ write_lock_bh(&ls->ls_rsbtbl_lock); - /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock - * check if the rsb is still in toss state, if not relookup - */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); /* something as changed, very unlikely but * try again @@ -1295,15 +1266,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto not_found; } - /* because the rsb is inactive (on toss list), it's not refcounted - * and lock_rsb is not used, but is protected by the rsbtbl lock - */ + /* because the rsb is inactive, it's not refcounted and lock_rsb + is not used, but is protected by the rsbtbl lock */ __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - rsb_mod_timer(ls, r); - /* the rsb was inactive (on toss list) */ + add_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; @@ -1317,7 +1286,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - rsb_set_flag(r, RSB_TOSS); + rsb_set_flag(r, RSB_INACTIVE); write_lock_bh(&ls->ls_rsbtbl_lock); error = rsb_insert(r, &ls->ls_rsbtbl); @@ -1335,8 +1304,8 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto retry; } - list_add(&r->res_rsbs_list, &ls->ls_toss); - rsb_mod_timer(ls, r); + list_add(&r->res_slow_list, &ls->ls_slow_inactive); + add_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); if (result) @@ -1351,7 +1320,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (r->res_hash == hash) dlm_dump_rsb(r); } @@ -1373,15 +1342,15 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) read_unlock_bh(&ls->ls_rsbtbl_lock); } -static void toss_rsb(struct kref *kref) +static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); - rsb_set_flag(r, RSB_TOSS); - list_move(&r->res_rsbs_list, &ls->ls_toss); - rsb_mod_timer(ls, r); + rsb_set_flag(r, RSB_INACTIVE); + list_move(&r->res_slow_list, &ls->ls_slow_inactive); + add_scan(ls, r); if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); @@ -1395,22 +1364,22 @@ static void unhold_rsb(struct dlm_rsb *r) { int rv; - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); - rv = kref_put(&r->res_ref, toss_rsb); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); + rv = kref_put(&r->res_ref, deactivate_rsb); DLM_ASSERT(!rv, dlm_dump_rsb(r);); } -void free_toss_rsb(struct dlm_rsb *r) +void free_inactive_rsb(struct dlm_rsb *r) { - WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); - DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); @@ -4256,8 +4225,9 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - /* Look for name in rsb toss state, if it's there, kill it. - * If it's in non toss state, it's being used, and we should ignore this + /* + * Look for inactive rsb, if it's there, free it. + * If the rsb is active, it's being used, and we should ignore this * message. This is an expected race between the dir node sending a * request to the master node at the same time as the master node sends * a remove to the dir node. The resolution to that race is for the @@ -4280,16 +4250,18 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ - log_error(ls, "receive_remove keep from %d master %d", + log_error(ls, "receive_remove on active rsb from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + /* Ignore the remove message, see race comment above. */ + log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); @@ -4298,19 +4270,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) } if (r->res_master_nodeid != from_nodeid) { - log_error(ls, "receive_remove toss from %d master %d", + log_error(ls, "receive_remove inactive from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); return; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); write_unlock_bh(&ls->ls_rsbtbl_lock); - free_toss_rsb(r); + free_inactive_rsb(r); } static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) @@ -5377,7 +5349,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; if (!is_master(r)) { diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 8de9dee4c058..4ed8d36f9c6d 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -11,7 +11,6 @@ #ifndef __LOCK_DOT_H__ #define __LOCK_DOT_H__ -void dlm_rsb_toss_timer(struct timer_list *timer); void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); void dlm_print_lkb(struct dlm_lkb *lkb); @@ -19,15 +18,15 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq); void dlm_receive_buffer(const union dlm_packet *p, int nodeid); int dlm_modes_compat(int mode1, int mode2); -void free_toss_rsb(struct dlm_rsb *r); +void free_inactive_rsb(struct dlm_rsb *r); void dlm_put_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r); int dlm_put_lkb(struct dlm_lkb *lkb); -void dlm_scan_rsbs(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls); void dlm_lock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls); -void dlm_timer_resume(struct dlm_ls *ls); +void dlm_rsb_scan(struct timer_list *timer); +void resume_scan_timer(struct dlm_ls *ls); int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, int len, unsigned int flags, int *r_nodeid, int *result); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 6f1078a1c715..3990880faea7 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -412,8 +412,8 @@ static int new_lockspace(const char *name, const char *cluster, */ ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); - INIT_LIST_HEAD(&ls->ls_toss); - INIT_LIST_HEAD(&ls->ls_keep); + INIT_LIST_HEAD(&ls->ls_slow_inactive); + INIT_LIST_HEAD(&ls->ls_slow_active); rwlock_init(&ls->ls_rsbtbl_lock); error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); @@ -490,10 +490,9 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_dir_dump_list); rwlock_init(&ls->ls_dir_dump_lock); - INIT_LIST_HEAD(&ls->ls_toss_q); - spin_lock_init(&ls->ls_toss_q_lock); - timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, - TIMER_DEFERRABLE); + INIT_LIST_HEAD(&ls->ls_scan_list); + spin_lock_init(&ls->ls_scan_lock); + timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE); spin_lock_bh(&lslist_lock); ls->ls_create_count = 1; @@ -723,7 +722,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) * time_shutdown_sync(), we don't care anymore */ clear_bit(LSFL_RUNNING, &ls->ls_flags); - timer_shutdown_sync(&ls->ls_timer); + timer_shutdown_sync(&ls->ls_scan_timer); if (ls_count == 1) { dlm_clear_members(ls); diff --git a/fs/dlm/member.c b/fs/dlm/member.c index c46e306f2e5c..a7ee7fd2b9d3 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -642,7 +642,7 @@ int dlm_ls_stop(struct dlm_ls *ls) set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); if (new) - timer_delete_sync(&ls->ls_timer); + timer_delete_sync(&ls->ls_scan_timer); ls->ls_recover_seq++; /* activate requestqueue and stop processing */ diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index d156196b9e69..c7afb428a2b4 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -882,29 +882,26 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list) log_rinfo(ls, "dlm_recover_rsbs %d done", count); } -/* Create a single list of all root rsb's to be used during recovery */ - -void dlm_clear_toss(struct dlm_ls *ls) +void dlm_clear_inactive(struct dlm_ls *ls) { struct dlm_rsb *r, *safe; unsigned int count = 0; write_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) { - list_del(&r->res_rsbs_list); + list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) { + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); - /* remove it from the toss queue if its part of it */ - if (!list_empty(&r->res_toss_q_list)) - list_del_init(&r->res_toss_q_list); + if (!list_empty(&r->res_scan_list)) + list_del_init(&r->res_scan_list); - free_toss_rsb(r); + free_inactive_rsb(r); count++; } write_unlock_bh(&ls->ls_rsbtbl_lock); if (count) - log_rinfo(ls, "dlm_clear_toss %u done", count); + log_rinfo(ls, "dlm_clear_inactive %u done", count); } diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index efc79a6e577d..ec69896462fb 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -25,7 +25,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc); int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, const struct list_head *root_list); void dlm_recovered_lock(struct dlm_rsb *r); -void dlm_clear_toss(struct dlm_ls *ls); +void dlm_clear_inactive(struct dlm_ls *ls); void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list); #endif /* __RECOVER_DOT_H__ */ diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 17a40d1e6036..34f4f9f49a6c 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -33,7 +33,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls) } read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (r->res_nodeid) continue; @@ -63,12 +63,12 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { list_add(&r->res_root_list, root_list); dlm_hold_rsb(r); } - WARN_ON_ONCE(!list_empty(&ls->ls_toss)); + WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive)); read_unlock_bh(&ls->ls_rsbtbl_lock); } @@ -98,16 +98,16 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) { set_bit(LSFL_RUNNING, &ls->ls_flags); - /* Schedule next timer if recovery put something on toss. + /* Schedule next timer if recovery put something on inactive. * * The rsbs that was queued while recovery on toss hasn't * started yet because LSFL_RUNNING was set everything * else recovery hasn't started as well because ls_in_recovery * is still hold. So we should not run into the case that - * dlm_timer_resume() queues a timer that can occur in + * resume_scan_timer() queues a timer that can occur in * a no op. */ - dlm_timer_resume(ls); + resume_scan_timer(ls); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); @@ -131,7 +131,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_callback_suspend(ls); - dlm_clear_toss(ls); + dlm_clear_inactive(ls); /* * This list of root rsb's will be the basis of most of the recovery From c217adfc8caad240ec7bed446a6a1a801d5acc6d Mon Sep 17 00:00:00 2001 From: David Teigland Date: Mon, 10 Jun 2024 15:18:09 -0500 Subject: [PATCH 09/17] dlm: fix add_scan and del_scan usage Remove a few calls to add_scan() and del_scan() in cases where the rsb is a dir record, so the rsb should never be placed on the scan list at all. Add WARN_ON to catch cases where this is done. Signed-off-by: David Teigland --- fs/dlm/lock.c | 57 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f5f2ceab5a04..5ca3f29bef7d 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -420,6 +420,9 @@ static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) { struct dlm_rsb *first; + /* active rsbs should never be on the scan list */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + spin_lock_bh(&ls->ls_scan_lock); r->res_toss_time = 0; @@ -457,17 +460,16 @@ static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *first; - /* If we're the directory record for this rsb, and - * we're not the master of it, then we need to wait - * for the master node to send us a dir remove for - * before removing the dir record. - */ - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - del_scan(ls, r); - return; - } + /* A dir record for a remote master rsb should never be on the scan list. */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* An active rsb should never be on the scan list. */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + /* An rsb should not already be on the scan list. */ + WARN_ON(!list_empty(&r->res_scan_list)); spin_lock_bh(&ls->ls_scan_lock); /* set the new rsb absolute expire time in the rsb */ @@ -479,12 +481,6 @@ static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) list_add_tail(&r->res_scan_list, &ls->ls_scan_list); enable_scan_timer(ls, r->res_toss_time); } else { - /* check if the rsb was already queued, if so delete - * it from the toss queue - */ - if (!list_empty(&r->res_scan_list)) - list_del(&r->res_scan_list); - /* try to get the maybe new first element and then add * to this rsb with the oldest expire time to the end * of the queue. If the list was empty before this @@ -807,10 +803,12 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } + /* A dir record will not be on the scan list. */ + if (r->res_dir_nodeid != our_nodeid) + del_scan(ls, r); list_move(&r->res_slow_list, &ls->ls_slow_active); rsb_clear_flag(r, RSB_INACTIVE); - kref_init(&r->res_ref); - del_scan(ls, r); + kref_init(&r->res_ref); /* ref is now used in active state */ write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -1272,7 +1270,10 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - add_scan(ls, r); + /* A dir record rsb should never be on scan list. */ + /* Try to fix this with del_scan? */ + WARN_ON(!list_empty(&r->res_scan_list)); + write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; @@ -1305,7 +1306,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } list_add(&r->res_slow_list, &ls->ls_slow_inactive); - add_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); if (result) @@ -1346,11 +1346,24 @@ static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; + int our_nodeid = dlm_our_nodeid(); DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); rsb_set_flag(r, RSB_INACTIVE); list_move(&r->res_slow_list, &ls->ls_slow_inactive); - add_scan(ls, r); + + /* + * When the rsb becomes unused: + * - If it's not a dir record for a remote master rsb, + * then it is put on the scan list to be freed. + * - If it's a dir record for a remote master rsb, + * then it is kept in the inactive state until + * receive_remove() from the master node. + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) + add_scan(ls, r); if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); From 01fdeca1cc2dd705b1391f31a2594214c8bd7886 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 10 Jun 2024 15:26:03 -0500 Subject: [PATCH 10/17] dlm: use rcu to avoid an extra rsb struct lookup Use rcu to free rsb structs, and hold the rcu read lock while looking up rsb structs. This allows us to avoid an extra hash table lookup for an rsb. A new rsb flag HASHED is added which is set while the rsb is in the hash table. This flag is checked in place of repeating the hash table lookup. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 2 + fs/dlm/lock.c | 102 +++++++++++++++++++++++++++++++++++------- fs/dlm/memory.c | 8 +++- 3 files changed, 96 insertions(+), 16 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 818484315906..e06fa17c5603 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -333,6 +333,7 @@ struct dlm_rsb { struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ int res_recover_locks_count; + struct rcu_head rcu; char *res_lvbptr; char res_name[DLM_RESNAME_MAXLEN+1]; @@ -366,6 +367,7 @@ enum rsb_flags { RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, RSB_INACTIVE, + RSB_HASHED, /* set while rsb is on ls_rsbtbl */ }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 5ca3f29bef7d..8bee4f444afd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -563,6 +563,7 @@ void dlm_rsb_scan(struct timer_list *timer) list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); @@ -636,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - return rhashtable_insert_fast(rhash, &rsb->res_node, - dlm_rhash_rsb_params); + int rv; + + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); + + return rv; } /* @@ -752,11 +759,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in inactive state - * if not it's in active state and we relookup - unlikely path. + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; @@ -926,11 +945,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still inactive. - * if it's active, repeat lookup - unlikely path. - */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; @@ -1012,12 +1028,54 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, return error; } +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + static int find_rsb(struct dlm_ls *ls, const void *name, int len, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { int dir_nodeid; uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; @@ -1025,12 +1083,15 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len, hash = jhash(name, len, 0); dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -1187,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, - int len, unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; uint32_t hash; @@ -1315,6 +1376,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { struct dlm_rsb *r; @@ -4293,6 +4364,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); write_unlock_bh(&ls->ls_rsbtbl_lock); free_inactive_rsb(r); diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 15a8b1cee433..105a79978706 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -101,13 +101,19 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls) return r; } -void dlm_free_rsb(struct dlm_rsb *r) +static void __free_rsb_rcu(struct rcu_head *rcu) { + struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu); if (r->res_lvbptr) dlm_free_lvb(r->res_lvbptr); kmem_cache_free(rsb_cache, r); } +void dlm_free_rsb(struct dlm_rsb *r) +{ + call_rcu(&r->rcu, __free_rsb_rcu); +} + struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) { struct dlm_lkb *lkb; From d3d85e9ad55b973eff3641dd3a61990a2c810785 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 3 Jun 2024 17:55:54 -0400 Subject: [PATCH 11/17] dlm: use LSFL_FS to check for kernel lockspace The existing external lockspace flag DLM_LSFL_FS is now also saved as an internal flag LSFL_FS, so it can be checked from other code locations which want to know if a lockspace is used from the kernel or user space. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/ast.c | 17 +++++++++++------ fs/dlm/dlm_internal.h | 1 + fs/dlm/lockspace.c | 13 +++++++------ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 59711486d801..52ce27031314 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -161,6 +161,9 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, int dlm_callback_start(struct dlm_ls *ls) { + if (!test_bit(LSFL_FS, &ls->ls_flags)) + return 0; + ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", WQ_HIGHPRI | WQ_MEM_RECLAIM); if (!ls->ls_callback_wq) { @@ -178,13 +181,15 @@ void dlm_callback_stop(struct dlm_ls *ls) void dlm_callback_suspend(struct dlm_ls *ls) { - if (ls->ls_callback_wq) { - spin_lock_bh(&ls->ls_cb_lock); - set_bit(LSFL_CB_DELAY, &ls->ls_flags); - spin_unlock_bh(&ls->ls_cb_lock); + if (!test_bit(LSFL_FS, &ls->ls_flags)) + return; + spin_lock_bh(&ls->ls_cb_lock); + set_bit(LSFL_CB_DELAY, &ls->ls_flags); + spin_unlock_bh(&ls->ls_cb_lock); + + if (ls->ls_callback_wq) flush_workqueue(ls->ls_callback_wq); - } } #define MAX_CB_QUEUE 25 @@ -195,7 +200,7 @@ void dlm_callback_resume(struct dlm_ls *ls) int count = 0, sum = 0; bool empty; - if (!ls->ls_callback_wq) + if (!test_bit(LSFL_FS, &ls->ls_flags)) return; more: diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index e06fa17c5603..c6baf25b9cae 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -698,6 +698,7 @@ struct dlm_ls { #define LSFL_CB_DELAY 9 #define LSFL_NODIR 10 #define LSFL_RECV_MSG_BLOCKED 11 +#define LSFL_FS 12 #define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_COMPAT 2 diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 3990880faea7..8a4351ee9a42 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -499,12 +499,13 @@ static int new_lockspace(const char *name, const char *cluster, list_add(&ls->ls_list, &lslist); spin_unlock_bh(&lslist_lock); - if (flags & DLM_LSFL_FS) { - error = dlm_callback_start(ls); - if (error) { - log_error(ls, "can't start dlm_callback %d", error); - goto out_delist; - } + if (flags & DLM_LSFL_FS) + set_bit(LSFL_FS, &ls->ls_flags); + + error = dlm_callback_start(ls); + if (error) { + log_error(ls, "can't start dlm_callback %d", error); + goto out_delist; } init_waitqueue_head(&ls->ls_recover_lock_wait); From f328a26eeb5380bc74e58cb9c3280a4908452df7 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 3 Jun 2024 17:55:55 -0400 Subject: [PATCH 12/17] dlm: introduce DLM_LSFL_SOFTIRQ_SAFE Introduce a new external lockspace flag DLM_LSFL_SOFTIRQ_SAFE. A lockspace user will set this flag if it can handle dlm running the callback functions from softirq context. When not set, dlm will continue to run callback functions from the dlm_callback workqueue. The new lockspace flag cannot be used for user space lockspaces, so a uapi placeholder definition is used for the new flag value. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lockspace.c | 3 +++ include/linux/dlm.h | 17 ++++++++++++++++- include/uapi/linux/dlm.h | 2 ++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 8a4351ee9a42..a7ac0fcb4ef3 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -629,6 +629,9 @@ int dlm_new_user_lockspace(const char *name, const char *cluster, void *ops_arg, int *ops_result, dlm_lockspace_t **lockspace) { + if (flags & DLM_LSFL_SOFTIRQ) + return -EINVAL; + return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, ops_result, lockspace); } diff --git a/include/linux/dlm.h b/include/linux/dlm.h index c58c4f790c04..bacda9898f2b 100644 --- a/include/linux/dlm.h +++ b/include/linux/dlm.h @@ -35,6 +35,9 @@ struct dlm_lockspace_ops { int num_slots, int our_slot, uint32_t generation); }; +/* only relevant for kernel lockspaces, will be removed in future */ +#define DLM_LSFL_SOFTIRQ __DLM_LSFL_RESERVED0 + /* * dlm_new_lockspace * @@ -55,6 +58,11 @@ struct dlm_lockspace_ops { * used to select the directory node. Must be the same on all nodes. * DLM_LSFL_NEWEXCL * dlm_new_lockspace() should return -EEXIST if the lockspace exists. + * DLM_LSFL_SOFTIRQ + * dlm request callbacks (ast, bast) are softirq safe. Flag should be + * preferred by users. Will be default in some future. If set the + * strongest context for ast, bast callback is softirq as it avoids + * an additional context switch. * * lvblen: length of lvb in bytes. Must be multiple of 8. * dlm_new_lockspace() returns an error if this does not match @@ -121,7 +129,14 @@ int dlm_release_lockspace(dlm_lockspace_t *lockspace, int force); * call. * * AST routines should not block (at least not for long), but may make - * any locking calls they please. + * any locking calls they please. If DLM_LSFL_SOFTIRQ for kernel + * users of dlm_new_lockspace() is passed the ast and bast callbacks + * can be processed in softirq context. Also some of the callback + * contexts are in the same context as the DLM lock request API, users + * must not hold locks while calling dlm lock request API and trying + * to acquire this lock in the callback again, this will end in a + * lock recursion. For newer implementation the DLM_LSFL_SOFTIRQ + * should be used. */ int dlm_lock(dlm_lockspace_t *lockspace, diff --git a/include/uapi/linux/dlm.h b/include/uapi/linux/dlm.h index e7e905fb0bb2..4eaf835780b0 100644 --- a/include/uapi/linux/dlm.h +++ b/include/uapi/linux/dlm.h @@ -71,6 +71,8 @@ struct dlm_lksb { /* DLM_LSFL_TIMEWARN is deprecated and reserved. DO NOT USE! */ #define DLM_LSFL_TIMEWARN 0x00000002 #define DLM_LSFL_NEWEXCL 0x00000008 +/* currently reserved due in-kernel use */ +#define __DLM_LSFL_RESERVED0 0x00000010 #endif /* _UAPI__DLM_DOT_H__ */ From 68bde2a67a6d6e5a2b8e1a64bad51dd8c3975256 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 3 Jun 2024 17:55:56 -0400 Subject: [PATCH 13/17] dlm: implement LSFL_SOFTIRQ_SAFE When a lockspace user allows it, run callback functions directly from softirq context, instead of queueing callbacks to be run from the dlm_callback workqueue context. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/ast.c | 155 +++++++++++++++++++++++++++--------------- fs/dlm/ast.h | 11 ++- fs/dlm/dlm_internal.h | 1 + fs/dlm/lockspace.c | 3 + fs/dlm/user.c | 38 +++++------ 5 files changed, 125 insertions(+), 83 deletions(-) diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 52ce27031314..742b30b61c19 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -18,35 +18,52 @@ #include "user.h" #include "ast.h" +static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode, + uint32_t flags, uint8_t sb_flags, int sb_status, + struct dlm_lksb *lksb, + void (*astfn)(void *astparam), + void (*bastfn)(void *astparam, int mode), + void *astparam, const char *res_name, + size_t res_length) +{ + if (flags & DLM_CB_BAST) { + trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length); + bastfn(astparam, mode); + } else if (flags & DLM_CB_CAST) { + trace_dlm_ast(ls_id, lkb_id, sb_status, sb_flags, res_name, + res_length); + lksb->sb_status = sb_status; + lksb->sb_flags = sb_flags; + astfn(astparam); + } +} + +static void dlm_do_callback(struct dlm_callback *cb) +{ + dlm_run_callback(cb->ls_id, cb->lkb_id, cb->mode, cb->flags, + cb->sb_flags, cb->sb_status, cb->lkb_lksb, + cb->astfn, cb->bastfn, cb->astparam, + cb->res_name, cb->res_length); + dlm_free_cb(cb); +} + static void dlm_callback_work(struct work_struct *work) { struct dlm_callback *cb = container_of(work, struct dlm_callback, work); - if (cb->flags & DLM_CB_BAST) { - trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, - cb->res_length); - cb->bastfn(cb->astparam, cb->mode); - } else if (cb->flags & DLM_CB_CAST) { - trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, - cb->sb_flags, cb->res_name, cb->res_length); - cb->lkb_lksb->sb_status = cb->sb_status; - cb->lkb_lksb->sb_flags = cb->sb_flags; - cb->astfn(cb->astparam); - } - - dlm_free_cb(cb); + dlm_do_callback(cb); } -int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags, - struct dlm_callback **cb) +bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, int *copy_lvb) { struct dlm_rsb *rsb = lkb->lkb_resource; - int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; struct dlm_ls *ls = rsb->res_ls; - int copy_lvb = 0; int prev_mode; + if (copy_lvb) + *copy_lvb = 0; + if (flags & DLM_CB_BAST) { /* if cb is a bast, it should be skipped if the blocking mode is * compatible with the last granted mode @@ -56,7 +73,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, log_debug(ls, "skip %x bast mode %d for cast mode %d", lkb->lkb_id, mode, lkb->lkb_last_cast_cb_mode); - goto out; + return true; } } @@ -74,7 +91,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { log_debug(ls, "skip %x add bast mode %d for bast mode %d", lkb->lkb_id, mode, prev_mode); - goto out; + return true; } } @@ -85,8 +102,10 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, prev_mode = lkb->lkb_last_cast_cb_mode; if (!status && lkb->lkb_lksb->sb_lvbptr && - dlm_lvb_operations[prev_mode + 1][mode + 1]) - copy_lvb = 1; + dlm_lvb_operations[prev_mode + 1][mode + 1]) { + if (copy_lvb) + *copy_lvb = 1; + } } lkb->lkb_last_cast_cb_mode = mode; @@ -96,11 +115,19 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, lkb->lkb_last_cb_mode = mode; lkb->lkb_last_cb_flags = flags; + return false; +} + +int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) +{ + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; + *cb = dlm_allocate_cb(); - if (!*cb) { - rv = DLM_ENQUEUE_CALLBACK_FAILURE; - goto out; - } + if (WARN_ON_ONCE(!*cb)) + return -ENOMEM; /* for tracing */ (*cb)->lkb_id = lkb->lkb_id; @@ -112,19 +139,34 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, (*cb)->mode = mode; (*cb)->sb_status = status; (*cb)->sb_flags = (sbflags & 0x000000FF); - (*cb)->copy_lvb = copy_lvb; (*cb)->lkb_lksb = lkb->lkb_lksb; - rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; + return 0; +} -out: - return rv; +static int dlm_get_queue_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) +{ + int rv; + + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, cb); + if (rv) + return rv; + + (*cb)->astfn = lkb->lkb_astfn; + (*cb)->bastfn = lkb->lkb_bastfn; + (*cb)->astparam = lkb->lkb_astparam; + INIT_WORK(&(*cb)->work, dlm_callback_work); + + return 0; } void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, - uint32_t sbflags) + uint32_t sbflags) { - struct dlm_ls *ls = lkb->lkb_resource->res_ls; + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; struct dlm_callback *cb; int rv; @@ -133,35 +175,34 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, return; } - rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, - &cb); - switch (rv) { - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - cb->astfn = lkb->lkb_astfn; - cb->bastfn = lkb->lkb_bastfn; - cb->astparam = lkb->lkb_astparam; - INIT_WORK(&cb->work, dlm_callback_work); + if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL)) + return; - spin_lock_bh(&ls->ls_cb_lock); - if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) + spin_lock_bh(&ls->ls_cb_lock); + if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) list_add(&cb->list, &ls->ls_cb_delay); - else - queue_work(ls->ls_callback_wq, &cb->work); - spin_unlock_bh(&ls->ls_cb_lock); - break; - case DLM_ENQUEUE_CALLBACK_SUCCESS: - break; - case DLM_ENQUEUE_CALLBACK_FAILURE: - fallthrough; - default: - WARN_ON_ONCE(1); - break; + } else { + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) { + dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags, + sbflags, status, lkb->lkb_lksb, + lkb->lkb_astfn, lkb->lkb_bastfn, + lkb->lkb_astparam, rsb->res_name, + rsb->res_length); + } else { + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) + queue_work(ls->ls_callback_wq, &cb->work); + } } + spin_unlock_bh(&ls->ls_cb_lock); } int dlm_callback_start(struct dlm_ls *ls) { - if (!test_bit(LSFL_FS, &ls->ls_flags)) + if (!test_bit(LSFL_FS, &ls->ls_flags) || + test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) return 0; ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", @@ -207,7 +248,11 @@ void dlm_callback_resume(struct dlm_ls *ls) spin_lock_bh(&ls->ls_cb_lock); list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { list_del(&cb->list); - queue_work(ls->ls_callback_wq, &cb->work); + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) + dlm_do_callback(cb); + else + queue_work(ls->ls_callback_wq, &cb->work); + count++; if (count == MAX_CB_QUEUE) break; diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index 9093ff043bee..e2b86845d331 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -11,12 +11,11 @@ #ifndef __ASTD_DOT_H__ #define __ASTD_DOT_H__ -#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1 -#define DLM_ENQUEUE_CALLBACK_SUCCESS 0 -#define DLM_ENQUEUE_CALLBACK_FAILURE -1 -int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags, - struct dlm_callback **cb); +bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, int *copy_lvb); +int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb); void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index c6baf25b9cae..32d98e63d25e 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -699,6 +699,7 @@ struct dlm_ls { #define LSFL_NODIR 10 #define LSFL_RECV_MSG_BLOCKED 11 #define LSFL_FS 12 +#define LSFL_SOFTIRQ 13 #define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_COMPAT 2 diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index a7ac0fcb4ef3..7c4f45ad2245 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -407,6 +407,9 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_ops_arg = ops_arg; } + if (flags & DLM_LSFL_SOFTIRQ) + set_bit(LSFL_SOFTIRQ, &ls->ls_flags); + /* ls_exflags are forced to match among nodes, and we don't * need to require all nodes to have some flags set */ diff --git a/fs/dlm/user.c b/fs/dlm/user.c index f6635a5314f4..5cb3896be826 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -182,7 +182,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, struct dlm_user_args *ua; struct dlm_user_proc *proc; struct dlm_callback *cb; - int rv; + int rv, copy_lvb; if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) || test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags)) @@ -213,28 +213,22 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, spin_lock_bh(&proc->asts_spin); - rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb); - switch (rv) { - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - cb->ua = *ua; - cb->lkb_lksb = &cb->ua.lksb; - if (cb->copy_lvb) { - memcpy(cb->lvbptr, ua->lksb.sb_lvbptr, - DLM_USER_LVB_LEN); - cb->lkb_lksb->sb_lvbptr = cb->lvbptr; - } + if (!dlm_may_skip_callback(lkb, flags, mode, status, sbflags, + ©_lvb)) { + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) { + cb->copy_lvb = copy_lvb; + cb->ua = *ua; + cb->lkb_lksb = &cb->ua.lksb; + if (copy_lvb) { + memcpy(cb->lvbptr, ua->lksb.sb_lvbptr, + DLM_USER_LVB_LEN); + cb->lkb_lksb->sb_lvbptr = cb->lvbptr; + } - list_add_tail(&cb->list, &proc->asts); - wake_up_interruptible(&proc->wait); - break; - case DLM_ENQUEUE_CALLBACK_SUCCESS: - break; - case DLM_ENQUEUE_CALLBACK_FAILURE: - fallthrough; - default: - spin_unlock_bh(&proc->asts_spin); - WARN_ON_ONCE(1); - goto out; + list_add_tail(&cb->list, &proc->asts); + wake_up_interruptible(&proc->wait); + } } spin_unlock_bh(&proc->asts_spin); From 5ce02000eb29db98dc2909b1a346f68acdd9db80 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 3 Jun 2024 17:55:58 -0400 Subject: [PATCH 14/17] md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace() Use the recently added DLM_LSFL_SOFTIRQ flag in dlm_new_lockspace(), signalling the ability to handle callbacks being run from softirq context. The md-cluster callback functions only call complete(), which is suitable for softirq. This should make dlm lock request completions more efficient by avoiding the workqueue context switch. Acked-by: Heming Zhao Acked-by: Song Liu Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- drivers/md/md-cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/md/md-cluster.c b/drivers/md/md-cluster.c index 8e36a0feec09..eb9bbf12c8d8 100644 --- a/drivers/md/md-cluster.c +++ b/drivers/md/md-cluster.c @@ -887,7 +887,7 @@ static int join(struct mddev *mddev, int nodes) memset(str, 0, 64); sprintf(str, "%pU", mddev->uuid); ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, - 0, LVB_SIZE, &md_ls_ops, mddev, + DLM_LSFL_SOFTIRQ, LVB_SIZE, &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); if (ret) goto err; From ec5530d6932ec35e92d3ef60d51b0dbc216c8049 Mon Sep 17 00:00:00 2001 From: "Dr. David Alan Gilbert" Date: Fri, 31 May 2024 00:49:18 +0100 Subject: [PATCH 15/17] fs: dlm: remove unused struct 'dlm_processed_nodes' The last use of 'dlm_processed_nodes' was removed in commit 1696c75f1864 ("fs: dlm: add send ack threshold and append acks to msgs"). Remove it. Signed-off-by: Dr. David Alan Gilbert Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 591385701c7d..2e3e269d820e 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -858,12 +858,6 @@ static void free_processqueue_entry(struct processqueue_entry *pentry) kfree(pentry); } -struct dlm_processed_nodes { - int nodeid; - - struct list_head list; -}; - static void process_dlm_messages(struct work_struct *work) { struct processqueue_entry *pentry; From 79ced51e2e5670da67339d5e21818cbc7ce60646 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Wed, 12 Jun 2024 17:15:56 -0400 Subject: [PATCH 16/17] dlm: remove DLM_LSFL_SOFTIRQ from exflags The DLM rcom handling has a check that all exflags are the same for the whole lockspace membership nodes. There are some flags that requires such handling, however DLM_LSFL_SOFTIRQ does not require this handling and it should be backwards compatibility with other lockspaces that does not set this flag. Fixes: f328a26eeb53 ("dlm: introduce DLM_LSFL_SOFTIRQ_SAFE") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lockspace.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 7c4f45ad2245..1848cbbc96a9 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -413,7 +413,8 @@ static int new_lockspace(const char *name, const char *cluster, /* ls_exflags are forced to match among nodes, and we don't * need to require all nodes to have some flags set */ - ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); + ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL | + DLM_LSFL_SOFTIRQ)); INIT_LIST_HEAD(&ls->ls_slow_inactive); INIT_LIST_HEAD(&ls->ls_slow_active); From 89b01913dc73d7c4b8440b1396909ccb7ec8c4b4 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 13 Jun 2024 13:06:40 -0400 Subject: [PATCH 17/17] dlm: add rcu_barrier before destroy kmem cache In the case we trigger dlm_free_rsb() that does a call_rcu() and the responding kfree() of res_lvbptr and a kmem_cache_free() of the rsb pointer we need to wait until this pending operation is done before calling kmem_cache_destroy(). We doing that by using rcu_barrier() that waits until all pending call_rcu() are done. This avoids that kmem_cache_destroy() complains about active objects around that are not being freed yet by call_rcu(). There is currently more discussions about to make this behaviour better, see: https://lore.kernel.org/netdev/20240609082726.32742-1-Julia.Lawall@inria.fr/ However this is only for call_rcu() if the callback calls kmem_cache_destroy() only to replace it by kfree_rcu() call which has currently some issue. This isn't our case because we also free the res_lvbptr if being set. For our case, to avoid the above race rcu_barrier() should be used before calling kmem_cache_destroy() to be sure that there are no active objects around. This is exactly what net/batman-adv is also doing before calling their kmem_cache_destroy() in module unloading. Fixes: 01fdeca1cc2d ("dlm: use rcu to avoid an extra rsb struct lookup") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/memory.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 105a79978706..8c44b954c166 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -72,6 +72,8 @@ int __init dlm_memory_init(void) void dlm_memory_exit(void) { + rcu_barrier(); + kmem_cache_destroy(writequeue_cache); kmem_cache_destroy(mhandle_cache); kmem_cache_destroy(msg_cache);