From 35d6df53b91632e8b20d3bd23510e68799c47a25 Mon Sep 17 00:00:00 2001 From: Azeem Shaikh Date: Wed, 10 May 2023 22:12:37 +0000 Subject: [PATCH 01/19] dlm: Replace all non-returning strlcpy with strscpy strlcpy() reads the entire source buffer first. This read may exceed the destination size limit. This is both inefficient and can lead to linear read overflows if a source string is not NUL-terminated [1]. In an effort to remove strlcpy() completely [2], replace strlcpy() here with strscpy(). No return values were used, so direct replacement is safe. [1] https://www.kernel.org/doc/html/latest/process/deprecated.html#strlcpy [2] https://github.com/KSPP/linux/issues/89 Signed-off-by: Azeem Shaikh Signed-off-by: David Teigland --- fs/dlm/config.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/dlm/config.c b/fs/dlm/config.c index d31319d08581..2beceff024e3 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -116,9 +116,9 @@ static ssize_t cluster_cluster_name_store(struct config_item *item, { struct dlm_cluster *cl = config_item_to_cluster(item); - strlcpy(dlm_config.ci_cluster_name, buf, + strscpy(dlm_config.ci_cluster_name, buf, sizeof(dlm_config.ci_cluster_name)); - strlcpy(cl->cl_cluster_name, buf, sizeof(cl->cl_cluster_name)); + strscpy(cl->cl_cluster_name, buf, sizeof(cl->cl_cluster_name)); return len; } From 92655fbda5c05950a411eaabc19e025e86e2a291 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 19 May 2023 11:21:24 -0400 Subject: [PATCH 02/19] fs: dlm: return positive pid value for F_GETLK The GETLK pid values have all been negated since commit 9d5b86ac13c5 ("fs/locks: Remove fl_nspid and use fs-specific l_pid for remote locks"). Revert this for local pids, and leave in place negative pids for remote owners. Cc: stable@vger.kernel.org Fixes: 9d5b86ac13c5 ("fs/locks: Remove fl_nspid and use fs-specific l_pid for remote locks") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index ed4357e62f35..ff364901f22b 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -360,7 +360,9 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, locks_init_lock(fl); fl->fl_type = (op->info.ex) ? F_WRLCK : F_RDLCK; fl->fl_flags = FL_POSIX; - fl->fl_pid = -op->info.pid; + fl->fl_pid = op->info.pid; + if (op->info.nodeid != dlm_our_nodeid()) + fl->fl_pid = -fl->fl_pid; fl->fl_start = op->info.start; fl->fl_end = op->info.end; rv = 0; From c847f4e203046a2c93d8a1cf0348315c0b655a60 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 19 May 2023 11:21:25 -0400 Subject: [PATCH 03/19] fs: dlm: fix cleanup pending ops when interrupted Immediately clean up a posix lock request if it is interrupted while waiting for a result from user space (dlm_controld.) This largely reverts the recent commit b92a4e3f86b1 ("fs: dlm: change posix lock sigint handling"). That previous commit attempted to defer lock cleanup to the point in time when a result from user space arrived. The deferred approach was not reliable because some dlm plock ops may not receive replies. Cc: stable@vger.kernel.org Fixes: b92a4e3f86b1 ("fs: dlm: change posix lock sigint handling") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index ff364901f22b..fea2157fac5b 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -30,8 +30,6 @@ struct plock_async_data { struct plock_op { struct list_head list; int done; - /* if lock op got interrupted while waiting dlm_controld reply */ - bool sigint; struct dlm_plock_info info; /* if set indicates async handling */ struct plock_async_data *data; @@ -167,12 +165,14 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, spin_unlock(&ops_lock); goto do_lock_wait; } - - op->sigint = true; + list_del(&op->list); spin_unlock(&ops_lock); + log_debug(ls, "%s: wait interrupted %x %llx pid %d", __func__, ls->ls_global_id, (unsigned long long)number, op->info.pid); + do_unlock_close(&op->info); + dlm_release_plock_op(op); goto out; } @@ -434,19 +434,6 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, if (iter->info.fsid == info.fsid && iter->info.number == info.number && iter->info.owner == info.owner) { - if (iter->sigint) { - list_del(&iter->list); - spin_unlock(&ops_lock); - - pr_debug("%s: sigint cleanup %x %llx pid %d", - __func__, iter->info.fsid, - (unsigned long long)iter->info.number, - iter->info.pid); - do_unlock_close(&iter->info); - memcpy(&iter->info, &info, sizeof(info)); - dlm_release_plock_op(iter); - return count; - } list_del_init(&iter->list); memcpy(&iter->info, &info, sizeof(info)); if (iter->data) @@ -465,8 +452,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, else wake_up(&recv_wq); } else - log_print("%s: no op %x %llx", __func__, - info.fsid, (unsigned long long)info.number); + pr_debug("%s: no op %x %llx", __func__, + info.fsid, (unsigned long long)info.number); return count; } From 59e45c758ca1b9893ac923dd63536da946ac333b Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 19 May 2023 11:21:26 -0400 Subject: [PATCH 04/19] fs: dlm: interrupt posix locks only when process is killed If a posix lock request is waiting for a result from user space (dlm_controld), do not let it be interrupted unless the process is killed. This reverts commit a6b1533e9a57 ("dlm: make posix locks interruptible"). The problem with the interruptible change is that all locks were cleared on any signal interrupt. If a signal was received that did not terminate the process, the process could continue running after all its dlm posix locks had been cleared. A future patch will add cancelation to allow proper interruption. Cc: stable@vger.kernel.org Fixes: a6b1533e9a57 ("dlm: make posix locks interruptible") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index fea2157fac5b..31bc601ee3d8 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -155,7 +155,7 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, send_op(op); - rv = wait_event_interruptible(recv_wq, (op->done != 0)); + rv = wait_event_killable(recv_wq, (op->done != 0)); if (rv == -ERESTARTSYS) { spin_lock(&ops_lock); /* recheck under ops_lock if we got a done != 0, From 0f2b1cb89ccdbdcedf7143f4153a4da700a05f48 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 19 May 2023 11:21:27 -0400 Subject: [PATCH 05/19] fs: dlm: make F_SETLK use unkillable wait_event While a non-waiting posix lock request (F_SETLK) is waiting for user space processing (in dlm_controld), wait for that processing to complete with an unkillable wait_event(). This makes F_SETLK behave the same way for F_RDLCK, F_WRLCK and F_UNLCK. F_SETLKW continues to use wait_event_killable(). Cc: stable@vger.kernel.org Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 31bc601ee3d8..c9e1d5f54194 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -155,25 +155,29 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, send_op(op); - rv = wait_event_killable(recv_wq, (op->done != 0)); - if (rv == -ERESTARTSYS) { - spin_lock(&ops_lock); - /* recheck under ops_lock if we got a done != 0, - * if so this interrupt case should be ignored - */ - if (op->done != 0) { + if (op->info.wait) { + rv = wait_event_killable(recv_wq, (op->done != 0)); + if (rv == -ERESTARTSYS) { + spin_lock(&ops_lock); + /* recheck under ops_lock if we got a done != 0, + * if so this interrupt case should be ignored + */ + if (op->done != 0) { + spin_unlock(&ops_lock); + goto do_lock_wait; + } + list_del(&op->list); spin_unlock(&ops_lock); - goto do_lock_wait; - } - list_del(&op->list); - spin_unlock(&ops_lock); - log_debug(ls, "%s: wait interrupted %x %llx pid %d", - __func__, ls->ls_global_id, - (unsigned long long)number, op->info.pid); - do_unlock_close(&op->info); - dlm_release_plock_op(op); - goto out; + log_debug(ls, "%s: wait interrupted %x %llx pid %d", + __func__, ls->ls_global_id, + (unsigned long long)number, op->info.pid); + do_unlock_close(&op->info); + dlm_release_plock_op(op); + goto out; + } + } else { + wait_event(recv_wq, (op->done != 0)); } do_lock_wait: From 57e2c2f2d94cfd551af91cedfa1af6d972487197 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Wed, 24 May 2023 12:02:04 -0400 Subject: [PATCH 06/19] fs: dlm: fix mismatch of plock results from userspace When a waiting plock request (F_SETLKW) is sent to userspace for processing (dlm_controld), the result is returned at a later time. That result could be incorrectly matched to a different waiting request in cases where the owner field is the same (e.g. different threads in a process.) This is fixed by comparing all the properties in the request and reply. The results for non-waiting plock requests are now matched based on list order because the results are returned in the same order they were sent. Cc: stable@vger.kernel.org Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 58 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index c9e1d5f54194..70a4752ed913 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -395,7 +395,7 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count, if (op->info.flags & DLM_PLOCK_FL_CLOSE) list_del(&op->list); else - list_move(&op->list, &recv_list); + list_move_tail(&op->list, &recv_list); memcpy(&info, &op->info, sizeof(info)); } spin_unlock(&ops_lock); @@ -433,20 +433,52 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, if (check_version(&info)) return -EINVAL; + /* + * The results for waiting ops (SETLKW) can be returned in any + * order, so match all fields to find the op. The results for + * non-waiting ops are returned in the order that they were sent + * to userspace, so match the result with the first non-waiting op. + */ spin_lock(&ops_lock); - list_for_each_entry(iter, &recv_list, list) { - if (iter->info.fsid == info.fsid && - iter->info.number == info.number && - iter->info.owner == info.owner) { - list_del_init(&iter->list); - memcpy(&iter->info, &info, sizeof(info)); - if (iter->data) - do_callback = 1; - else - iter->done = 1; - op = iter; - break; + if (info.wait) { + list_for_each_entry(iter, &recv_list, list) { + if (iter->info.fsid == info.fsid && + iter->info.number == info.number && + iter->info.owner == info.owner && + iter->info.pid == info.pid && + iter->info.start == info.start && + iter->info.end == info.end && + iter->info.ex == info.ex && + iter->info.wait) { + op = iter; + break; + } } + } else { + list_for_each_entry(iter, &recv_list, list) { + if (!iter->info.wait) { + op = iter; + break; + } + } + } + + if (op) { + /* Sanity check that op and info match. */ + if (info.wait) + WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK); + else + WARN_ON(op->info.fsid != info.fsid || + op->info.number != info.number || + op->info.owner != info.owner || + op->info.optype != info.optype); + + list_del_init(&op->list); + memcpy(&op->info, &info, sizeof(info)); + if (op->data) + do_callback = 1; + else + op->done = 1; } spin_unlock(&ops_lock); From c6b6d6dcc7f32767d57740e0552337c8de40610b Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:29 -0400 Subject: [PATCH 07/19] fs: dlm: revert check required context while close This patch reverts commit 2c3fa6ae4d52 ("dlm: check required context while close"). The function dlm_midcomms_close(), which will call later dlm_lowcomms_close(), is called when the cluster manager tells the node got fenced which means on midcomms/lowcomms layer to disconnect the node from the cluster communication. The node can rejoin the cluster later. This patch was ensuring no new message were able to be triggered when we are in the close() function context. This was done by checking if the lockspace has been stopped. However there is a missing check that we only need to check specific lockspaces where the fenced node is member of. This is currently complicated because there is no way to easily check if a node is part of a specific lockspace without stopping the recovery. For now we just revert this commit as it is just a check to finding possible leaks of stopping lockspaces before close() is called. Cc: stable@vger.kernel.org Fixes: 2c3fa6ae4d52 ("dlm: check required context while close") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lockspace.c | 12 ------------ fs/dlm/lockspace.h | 1 - fs/dlm/midcomms.c | 3 --- 3 files changed, 16 deletions(-) diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 67261b7b1f0e..0455dddb0797 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -935,15 +935,3 @@ void dlm_stop_lockspaces(void) log_print("dlm user daemon left %d lockspaces", count); } -void dlm_stop_lockspaces_check(void) -{ - struct dlm_ls *ls; - - spin_lock(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) || - !dlm_locking_stopped(ls))) - break; - } - spin_unlock(&lslist_lock); -} diff --git a/fs/dlm/lockspace.h b/fs/dlm/lockspace.h index 03f4a4a3a871..47ebd4411926 100644 --- a/fs/dlm/lockspace.h +++ b/fs/dlm/lockspace.h @@ -27,7 +27,6 @@ struct dlm_ls *dlm_find_lockspace_local(void *id); struct dlm_ls *dlm_find_lockspace_device(int minor); void dlm_put_lockspace(struct dlm_ls *ls); void dlm_stop_lockspaces(void); -void dlm_stop_lockspaces_check(void); int dlm_new_user_lockspace(const char *name, const char *cluster, uint32_t flags, int lvblen, const struct dlm_lockspace_ops *ops, diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index c02c43e4980a..3df916a568ba 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -136,7 +136,6 @@ #include #include "dlm_internal.h" -#include "lockspace.h" #include "lowcomms.h" #include "config.h" #include "memory.h" @@ -1491,8 +1490,6 @@ int dlm_midcomms_close(int nodeid) if (nodeid == dlm_our_nodeid()) return 0; - dlm_stop_lockspaces_check(); - idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ node = nodeid2node(nodeid, 0); From 7a931477bff1c7548aa8492bccf600f5f29452b1 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:30 -0400 Subject: [PATCH 08/19] fs: dlm: clear pending bit when queue was empty This patch clears the DLM_IFL_CB_PENDING_BIT flag which will be set when there is callback work queued when there was no callback to dequeue. It is a buggy case and should never happen, that's why there is a WARN_ON(). However if the case happens we are prepared to somehow recover from it. Cc: stable@vger.kernel.org Fixes: 61bed0baa4db ("fs: dlm: use a non-static queue for callbacks") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/ast.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 700ff2e0515a..ff0ef4653535 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -181,10 +181,12 @@ void dlm_callback_work(struct work_struct *work) spin_lock(&lkb->lkb_cb_lock); rv = dlm_dequeue_lkb_callback(lkb, &cb); - spin_unlock(&lkb->lkb_cb_lock); - - if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) + if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) { + clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); + spin_unlock(&lkb->lkb_cb_lock); goto out; + } + spin_unlock(&lkb->lkb_cb_lock); for (;;) { castfn = lkb->lkb_astfn; From f68bb23cad1f128198074ed7b3a4c5fb03dbd9d2 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:31 -0400 Subject: [PATCH 09/19] fs: dlm: fix missing pending to false This patch sets the process_dlm_messages_pending boolean to false when there was no message to process. It is a case which should not happen but if we are prepared to recover from this situation by setting pending boolean to false. Cc: stable@vger.kernel.org Fixes: dbb751ffab0b ("fs: dlm: parallelize lowcomms socket handling") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 1 + 1 file changed, 1 insertion(+) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 3d3802c47b8b..5aad4d4842eb 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -898,6 +898,7 @@ static void process_dlm_messages(struct work_struct *work) pentry = list_first_entry_or_null(&processqueue, struct processqueue_entry, list); if (WARN_ON_ONCE(!pentry)) { + process_dlm_messages_pending = false; spin_unlock(&processqueue_lock); return; } From cbba21169eeff4d43e064c43d0b95b1f89587da3 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:32 -0400 Subject: [PATCH 10/19] fs: dlm: unregister memory at the very last The dlm modules midcomms, debugfs, lockspace, uses kmem caches. We ensure that the kmem caches getting deallocated after those modules exited. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/main.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/dlm/main.c b/fs/dlm/main.c index 93755f83a30d..6ca28299c9db 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -73,10 +73,10 @@ static void __exit exit_dlm(void) dlm_plock_exit(); dlm_user_exit(); dlm_config_exit(); - dlm_memory_exit(); dlm_lockspace_exit(); dlm_midcomms_exit(); dlm_unregister_debugfs(); + dlm_memory_exit(); } module_init(init_dlm); From f8bce79d9d9edb8d2302a216f298f4839fb0adb6 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:33 -0400 Subject: [PATCH 11/19] fs: dlm: don't check othercon twice This patch removes an another check if con->othercon set inside the branch which already does that. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5aad4d4842eb..b28505b8b23b 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1497,8 +1497,7 @@ int dlm_lowcomms_close(int nodeid) call_srcu(&connections_srcu, &con->rcu, connection_release); if (con->othercon) { clean_one_writequeue(con->othercon); - if (con->othercon) - call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); + call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); } srcu_read_unlock(&connections_srcu, idx); From d41a1a3db49f4b128404114f693398f2878b5100 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:34 -0400 Subject: [PATCH 12/19] fs: dlm: cleanup STOP_IO bitflag set when stop io There should no difference between setting the CF_IO_STOP flag before restore_callbacks() to do it before or afterwards. The restore_callbacks() will be sure that no callback is executed anymore when the bit wasn't set. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index b28505b8b23b..5a7586633cbe 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -735,19 +735,15 @@ static void stop_connection_io(struct connection *con) if (con->othercon) stop_connection_io(con->othercon); + spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + down_write(&con->sock_lock); if (con->sock) { lock_sock(con->sock->sk); restore_callbacks(con->sock->sk); - - spin_lock_bh(&con->writequeue_lock); - set_bit(CF_IO_STOP, &con->flags); - spin_unlock_bh(&con->writequeue_lock); release_sock(con->sock->sk); - } else { - spin_lock_bh(&con->writequeue_lock); - set_bit(CF_IO_STOP, &con->flags); - spin_unlock_bh(&con->writequeue_lock); } up_write(&con->sock_lock); From 5ce9ef30f226c43aa711a4807077447251efa0c6 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:35 -0400 Subject: [PATCH 13/19] fs: dlm: move dlm_purge_lkb_callbacks to user module This patch moves the dlm_purge_lkb_callbacks() function from ast to user dlm module as it is only a function being used by dlm user implementation. I got be hinted to hold specific locks regarding the callback handling for dlm_purge_lkb_callbacks() but it was false positive. It is confusing because ast dlm implementation uses a different locking behaviour as user locks uses as DLM handles kernel and user dlm locks differently. To avoid the confusing we move this function to dlm user implementation. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/ast.c | 17 ----------------- fs/dlm/ast.h | 1 - fs/dlm/user.c | 18 ++++++++++++++++++ fs/dlm/user.h | 1 + 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index ff0ef4653535..1f2f70a1b824 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -36,23 +36,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from, *from = to; } -void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb) -{ - struct dlm_callback *cb, *safe; - - list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) { - list_del(&cb->list); - kref_put(&cb->ref, dlm_release_callback); - } - - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - - /* invalidate */ - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); - lkb->lkb_last_bast_mode = -1; -} - int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags) { diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index 880b11882495..ce007892dc2d 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -26,7 +26,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from, struct dlm_callback *to); void dlm_release_callback(struct kref *ref); -void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb); void dlm_callback_work(struct work_struct *work); int dlm_callback_start(struct dlm_ls *ls); void dlm_callback_stop(struct dlm_ls *ls); diff --git a/fs/dlm/user.c b/fs/dlm/user.c index d9c09fc0aba1..695e691b38b3 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -145,6 +145,24 @@ static void compat_output(struct dlm_lock_result *res, } #endif +/* should held proc->asts_spin lock */ +void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb) +{ + struct dlm_callback *cb, *safe; + + list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) { + list_del(&cb->list); + kref_put(&cb->ref, dlm_release_callback); + } + + clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); + + /* invalidate */ + dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); + dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); + lkb->lkb_last_bast_mode = -1; +} + /* Figure out if this lock is at the end of its life and no longer available for the application to use. The lkb still exists until the final ast is read. A lock becomes EOL in three situations: diff --git a/fs/dlm/user.h b/fs/dlm/user.h index 33059452d79e..2caf8e6e24d5 100644 --- a/fs/dlm/user.h +++ b/fs/dlm/user.h @@ -6,6 +6,7 @@ #ifndef __USER_DOT_H__ #define __USER_DOT_H__ +void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb); void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags); int dlm_user_init(void); From 70cf2fecf87354be09c0ab4c233fccea36256d3c Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:36 -0400 Subject: [PATCH 14/19] fs: dlm: warn about messages from left nodes This patch warns about messages which are received from nodes who already left the lockspace resource signaled by the cluster manager. Before commit 489d8e559c65 ("fs: dlm: add reliable connection if reconnect") there was a synchronization issue with the socket lifetime and the cluster event of leaving a lockspace and other nodes did not stop of sending messages because the cluster manager has a pending message to leave the lockspace. The reliable session layer for dlm use sequence numbers to ensure dlm message were never being dropped. If this is not corrected synchronized we have a problem, this patch will use the filter case and turn it into a WARN_ON_ONCE() so we seeing such issue on the kernel log because it should never happen now. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index debf8a55ad7d..ede903246fa4 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -4616,7 +4616,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, { int error = 0, noent = 0; - if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) { + if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) { log_limit(ls, "receive %d from non-member %d %x %x %d", le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_header.h_nodeid), @@ -4754,7 +4754,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, /* If we were a member of this lockspace, left, and rejoined, other nodes may still be sending us messages from the lockspace generation before we left. */ - if (!ls->ls_generation) { + if (WARN_ON_ONCE(!ls->ls_generation)) { log_limit(ls, "receive %d from %d ignore old gen", le32_to_cpu(ms->m_type), nodeid); return; From 07ee38674a0b9071fa0bbec4dbda6aad1c5e4003 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:37 -0400 Subject: [PATCH 15/19] fs: dlm: filter ourself midcomms calls It makes no sense to call midcomms/lowcomms functionality for the local node as socket functionality is only required for remote nodes. This patch filters those calls in the upper layer of lockspace membership handling instead of doing it in midcomms/lowcomms layer as they should never be aware of local nodeid. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 3 ++- fs/dlm/lowcomms.c | 3 --- fs/dlm/member.c | 37 ++++++++++++++++++++++++++++++------- fs/dlm/midcomms.c | 9 --------- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 2beceff024e3..4246cd425671 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -532,7 +532,8 @@ static void drop_comm(struct config_group *g, struct config_item *i) struct dlm_comm *cm = config_item_to_comm(i); if (local_comm == cm) local_comm = NULL; - dlm_midcomms_close(cm->nodeid); + if (!cm->local) + dlm_midcomms_close(cm->nodeid); while (cm->addr_count--) kfree(cm->addr[cm->addr_count]); config_item_put(i); diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5a7586633cbe..345a316ae54c 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -546,9 +546,6 @@ int dlm_lowcomms_connect_node(int nodeid) struct connection *con; int idx; - if (nodeid == dlm_our_nodeid()) - return 0; - idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, 0); if (WARN_ON_ONCE(!con)) { diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 923c01a8a0aa..77d202e4a02a 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -307,6 +307,21 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) } } +static int add_remote_member(int nodeid) +{ + int error; + + if (nodeid == dlm_our_nodeid()) + return 0; + + error = dlm_lowcomms_connect_node(nodeid); + if (error < 0) + return error; + + dlm_midcomms_add_member(nodeid); + return 0; +} + static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) { struct dlm_member *memb; @@ -316,16 +331,16 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) if (!memb) return -ENOMEM; - error = dlm_lowcomms_connect_node(node->nodeid); + memb->nodeid = node->nodeid; + memb->weight = node->weight; + memb->comm_seq = node->comm_seq; + + error = add_remote_member(node->nodeid); if (error < 0) { kfree(memb); return error; } - memb->nodeid = node->nodeid; - memb->weight = node->weight; - memb->comm_seq = node->comm_seq; - dlm_midcomms_add_member(node->nodeid); add_ordered_member(ls, memb); ls->ls_num_nodes++; return 0; @@ -370,9 +385,17 @@ static void clear_memb_list(struct list_head *head, } } +static void remove_remote_member(int nodeid) +{ + if (nodeid == dlm_our_nodeid()) + return; + + dlm_midcomms_remove_member(nodeid); +} + static void clear_members_cb(int nodeid) { - dlm_midcomms_remove_member(nodeid); + remove_remote_member(nodeid); } void dlm_clear_members(struct dlm_ls *ls) @@ -562,7 +585,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) neg++; list_move(&memb->list, &ls->ls_nodes_gone); - dlm_midcomms_remove_member(memb->nodeid); + remove_remote_member(memb->nodeid); ls->ls_num_nodes--; dlm_lsop_recover_slot(ls, memb); } diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 3df916a568ba..9c66cb853d17 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -1280,9 +1280,6 @@ void dlm_midcomms_add_member(int nodeid) struct midcomms_node *node; int idx; - if (nodeid == dlm_our_nodeid()) - return; - idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, GFP_NOFS); if (!node) { @@ -1328,9 +1325,6 @@ void dlm_midcomms_remove_member(int nodeid) struct midcomms_node *node; int idx; - if (nodeid == dlm_our_nodeid()) - return; - idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, 0); if (!node) { @@ -1487,9 +1481,6 @@ int dlm_midcomms_close(int nodeid) struct midcomms_node *node; int idx, ret; - if (nodeid == dlm_our_nodeid()) - return 0; - idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ node = nodeid2node(nodeid, 0); From 75a7d60134ce84209f2c61ec4619ee543aa8f466 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:38 -0400 Subject: [PATCH 16/19] fs: dlm: handle lkb wait count as atomic_t Currently the lkb_wait_count is locked by the rsb lock and it should be fine to handle lkb_wait_count as non atomic_t value. However for the overall process of reducing locking this patch converts it to an atomic_t value. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 2 +- fs/dlm/lock.c | 32 ++++++++++++++------------------ 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 986a9d7b1f33..c8156770205e 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -246,7 +246,7 @@ struct dlm_lkb { int8_t lkb_highbast; /* highest mode bast sent for */ int8_t lkb_wait_type; /* type of reply waiting for */ - int8_t lkb_wait_count; + atomic_t lkb_wait_count; int lkb_wait_nodeid; /* for debugging */ struct list_head lkb_statequeue; /* rsb g/c/w list */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index ede903246fa4..f511a9d7d416 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1407,6 +1407,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error = 0; + int wc; mutex_lock(&ls->ls_waiters_mutex); @@ -1428,20 +1429,17 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) error = -EBUSY; goto out; } - lkb->lkb_wait_count++; + wc = atomic_inc_return(&lkb->lkb_wait_count); hold_lkb(lkb); log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", - lkb->lkb_id, lkb->lkb_wait_type, mstype, - lkb->lkb_wait_count, dlm_iflags_val(lkb)); + lkb->lkb_id, lkb->lkb_wait_type, mstype, wc, + dlm_iflags_val(lkb)); goto out; } - DLM_ASSERT(!lkb->lkb_wait_count, - dlm_print_lkb(lkb); - printk("wait_count %d\n", lkb->lkb_wait_count);); - - lkb->lkb_wait_count++; + wc = atomic_fetch_inc(&lkb->lkb_wait_count); + DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc);); lkb->lkb_wait_type = mstype; lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ hold_lkb(lkb); @@ -1504,7 +1502,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, log_debug(ls, "remwait %x convert_reply zap overlap_cancel", lkb->lkb_id); lkb->lkb_wait_type = 0; - lkb->lkb_wait_count--; + atomic_dec(&lkb->lkb_wait_count); unhold_lkb(lkb); goto out_del; } @@ -1531,16 +1529,15 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, if (overlap_done && lkb->lkb_wait_type) { log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); - lkb->lkb_wait_count--; + atomic_dec(&lkb->lkb_wait_count); unhold_lkb(lkb); lkb->lkb_wait_type = 0; } - DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); + DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb);); clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); - lkb->lkb_wait_count--; - if (!lkb->lkb_wait_count) + if (atomic_dec_and_test(&lkb->lkb_wait_count)) list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); return 0; @@ -2669,7 +2666,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, goto out; /* lock not allowed if there's any op in progress */ - if (lkb->lkb_wait_type || lkb->lkb_wait_count) + if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)) goto out; if (is_overlap(lkb)) @@ -2731,7 +2728,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* normal unlock not allowed if there's any op in progress */ if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && - (lkb->lkb_wait_type || lkb->lkb_wait_count)) + (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))) goto out; /* an lkb may be waiting for an rsb lookup to complete where the @@ -5066,10 +5063,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) /* drop all wait_count references we still * hold a reference for this iteration. */ - while (lkb->lkb_wait_count) { - lkb->lkb_wait_count--; + while (!atomic_dec_and_test(&lkb->lkb_wait_count)) unhold_lkb(lkb); - } + mutex_lock(&ls->ls_waiters_mutex); list_del_init(&lkb->lkb_wait_reply); mutex_unlock(&ls->ls_waiters_mutex); From d00725cab226491fc347d9bc42b881a0a35419cf Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:39 -0400 Subject: [PATCH 17/19] fs: dlm: handle sequence numbers as atomic Currently seq_next is only be read on the receive side which processed in an ordered way. The seq_send is being protected by locks. To being able to read the seq_next value on send side as well we convert it to an atomic_t value. The atomic_cmpxchg() is probably not necessary, however the atomic_inc() depends on a if coniditional and this should be handled in an atomic context. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/midcomms.c | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 9c66cb853d17..70286737eb0a 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -152,8 +152,8 @@ struct midcomms_node { int nodeid; uint32_t version; - uint32_t seq_send; - uint32_t seq_next; + atomic_t seq_send; + atomic_t seq_next; /* These queues are unbound because we cannot drop any message in dlm. * We could send a fence signal for a specific node to the cluster * manager if queues hits some maximum value, however this handling @@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node) { pr_debug("reset node %d\n", node->nodeid); - node->seq_next = DLM_SEQ_INIT; - node->seq_send = DLM_SEQ_INIT; + atomic_set(&node->seq_next, DLM_SEQ_INIT); + atomic_set(&node->seq_send, DLM_SEQ_INIT); node->version = DLM_VERSION_NOT_SET; node->flags = 0; @@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, struct midcomms_node *node, uint32_t seq) { - if (seq == node->seq_next) { - node->seq_next++; + bool is_expected_seq; + uint32_t oval, nval; + do { + oval = atomic_read(&node->seq_next); + is_expected_seq = (oval == seq); + if (!is_expected_seq) + break; + + nval = oval + 1; + } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval); + + if (is_expected_seq) { switch (p->header.h_cmd) { case DLM_FIN: spin_lock(&node->state_lock); @@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, switch (node->state) { case DLM_ESTABLISHED: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); /* passive shutdown DLM_LAST_ACK case 1 * additional we check if the node is used by @@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, } break; case DLM_FIN_WAIT1: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); node->state = DLM_CLOSING; set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); break; case DLM_FIN_WAIT2: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); midcomms_node_reset(node); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, /* retry to ack message which we already have by sending back * current node->seq_next number as ack. */ - if (seq < node->seq_next) - dlm_send_ack(node->nodeid, node->seq_next); + if (seq < oval) + dlm_send_ack(node->nodeid, oval); log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", - seq, node->seq_next, node->nodeid); + seq, oval, node->nodeid); } } @@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid) switch (node->state) { case DLM_ESTABLISHED: spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); break; default: spin_unlock(&node->state_lock); @@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data) list_add_tail_rcu(&mh->list, &mh->node->send_queue); spin_unlock_bh(&mh->node->send_queue_lock); - mh->seq = mh->node->seq_send++; + mh->seq = atomic_fetch_inc(&mh->node->seq_send); } static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, @@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data) switch (h->h_cmd) { case DLM_OPTS: if (!h->u.h_seq) - h->u.h_seq = cpu_to_le32(rd->node->seq_send++); + h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send)); break; default: break; From 1696c75f1864b338fccbc55f278039d199287ec3 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 29 May 2023 17:44:40 -0400 Subject: [PATCH 18/19] fs: dlm: add send ack threshold and append acks to msgs This patch changes the time when we sending an ack back to tell the other side it can free some message because it is arrived on the receiver node, due random reconnects e.g. TCP resets this is handled as well on application layer to not let DLM run into a deadlock state. The current handling has the following problems: 1. We end in situations that we only send an ack back message of 16 bytes out and no other messages. Whereas DLM has logic to combine so much messages as it can in one send() socket call. This behaviour can be discovered by "trace-cmd start -e dlm_recv" and observing the ret field being 16 bytes. 2. When processing of DLM messages will never end because we receive a lot of messages, we will not send an ack back as it happens when the processing loop ends. This patch introduces a likely and unlikely threshold case. The likely case will send an ack back on a transmit path if the threshold is triggered of amount of processed upper layer protocol. This will solve issue 1 because it will be send when another normal DLM message will be sent. It solves issue 2 because it is not part of the processing loop. There is however a unlikely case, the unlikely case has a bigger threshold and will be triggered when we only receive messages and do not sent any message back. This case avoids that the sending node will keep a lot of message for a long time as we send sometimes ack backs to tell the sender to finally release messages. The atomic cmpxchg() is there to provide a atomically ack send with reset of the upper layer protocol delivery counter. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 30 ------------------- fs/dlm/midcomms.c | 76 +++++++++++++++++++---------------------------- 2 files changed, 31 insertions(+), 75 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 345a316ae54c..68092f953830 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -860,30 +860,8 @@ struct dlm_processed_nodes { struct list_head list; }; -static void add_processed_node(int nodeid, struct list_head *processed_nodes) -{ - struct dlm_processed_nodes *n; - - list_for_each_entry(n, processed_nodes, list) { - /* we already remembered this node */ - if (n->nodeid == nodeid) - return; - } - - /* if it's fails in worst case we simple don't send an ack back. - * We try it next time. - */ - n = kmalloc(sizeof(*n), GFP_NOFS); - if (!n) - return; - - n->nodeid = nodeid; - list_add(&n->list, processed_nodes); -} - static void process_dlm_messages(struct work_struct *work) { - struct dlm_processed_nodes *n, *n_tmp; struct processqueue_entry *pentry; LIST_HEAD(processed_nodes); @@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work) for (;;) { dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, pentry->buflen); - add_processed_node(pentry->nodeid, &processed_nodes); free_processqueue_entry(pentry); spin_lock(&processqueue_lock); @@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work) list_del(&pentry->list); spin_unlock(&processqueue_lock); } - - /* send ack back after we processed couple of messages */ - list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { - list_del(&n->list); - dlm_midcomms_receive_done(n->nodeid); - kfree(n); - } } /* Data received from remote end */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 70286737eb0a..e1a0df67b566 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -148,6 +148,8 @@ /* 5 seconds wait to sync ending of dlm */ #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000) #define DLM_VERSION_NOT_SET 0 +#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32 +#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8) struct midcomms_node { int nodeid; @@ -165,7 +167,7 @@ struct midcomms_node { #define DLM_NODE_FLAG_CLOSE 1 #define DLM_NODE_FLAG_STOP_TX 2 #define DLM_NODE_FLAG_STOP_RX 3 -#define DLM_NODE_ULP_DELIVERED 4 + atomic_t ulp_delivered; unsigned long flags; wait_queue_head_t shutdown_wait; @@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node) atomic_set(&node->seq_next, DLM_SEQ_INIT); atomic_set(&node->seq_send, DLM_SEQ_INIT); + atomic_set(&node->ulp_delivered, 0); node->version = DLM_VERSION_NOT_SET; node->flags = 0; @@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq) return 0; } +static void dlm_send_ack_threshold(struct midcomms_node *node, + uint32_t threshold) +{ + uint32_t oval, nval; + bool send_ack; + + /* let only send one user trigger threshold to send ack back */ + do { + oval = atomic_read(&node->ulp_delivered); + send_ack = (oval > threshold); + /* abort if threshold is not reached */ + if (!send_ack) + break; + + nval = 0; + /* try to reset ulp_delivered counter */ + } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval); + + if (send_ack) + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); +} + static int dlm_send_fin(struct midcomms_node *node, void (*ack_rcv)(struct midcomms_node *node)) { @@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); dlm_receive_buffer_3_2_trace(seq, p); dlm_receive_buffer(p, node->nodeid); - set_bit(DLM_NODE_ULP_DELIVERED, &node->flags); + atomic_inc(&node->ulp_delivered); + /* unlikely case to send ack back when we don't transmit */ + dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD); break; } } else { @@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) return ret; } -void dlm_midcomms_receive_done(int nodeid) -{ - struct midcomms_node *node; - int idx; - - idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* old protocol, we do nothing */ - switch (node->version) { - case DLM_VERSION_3_2: - break; - default: - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* do nothing if we didn't delivered stateful to ulp */ - if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED, - &node->flags)) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - spin_lock(&node->state_lock); - /* we only ack if state is ESTABLISHED */ - switch (node->state) { - case DLM_ESTABLISHED: - spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); - break; - default: - spin_unlock(&node->state_lock); - /* do nothing FIN has it's own ack send */ - break; - } - srcu_read_unlock(&nodes_srcu, idx); -} - void dlm_midcomms_unack_msg_resend(int nodeid) { struct midcomms_node *node; @@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, goto err; } + /* send ack back if necessary */ + dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD); break; default: dlm_free_mhandle(mh); From fc4ea4229c2b2dca0bffc12eee6973e353c20086 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 20 Jun 2023 10:15:57 -0400 Subject: [PATCH 19/19] fs: dlm: remove filter local comms on close The current way how lowcomms is configured is due configfs entries. Each comms configfs entry will create a lowcomms connection. Even the local connection itself will be stored as a lowcomms connection, although most functionality for a local lowcomms connection struct is not necessary. Now in some scenarios we will see that dlm_controld reports a -EEXIST when configure a node via configfs: ... /sys/kernel/config/dlm/cluster/comms/1/addr: write failed: 17 -1 Doing a: cat /sys/kernel/config/dlm/cluster/comms/1/addr_list reported nothing. This was being seen on cluster with nodeid 1 and it's local configuration. To be sure the configfs entries are in sync with lowcomms connection structures we always call dlm_midcomms_close() to be sure the lowcomms connection gets removed when the configfs entry gets dropped. Before commit 07ee38674a0b ("fs: dlm: filter ourself midcomms calls") it was just doing this by accident and the filter by doing: if (nodeid == dlm_our_nodeid()) return 0; inside dlm_midcomms_close() was never been hit because drop_comm() sets local_comm to NULL and cause that dlm_our_nodeid() returns always the invalid nodeid 0. Fixes: 07ee38674a0b ("fs: dlm: filter ourself midcomms calls") Cc: stable@vger.kernel.org Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 4246cd425671..2beceff024e3 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -532,8 +532,7 @@ static void drop_comm(struct config_group *g, struct config_item *i) struct dlm_comm *cm = config_item_to_comm(i); if (local_comm == cm) local_comm = NULL; - if (!cm->local) - dlm_midcomms_close(cm->nodeid); + dlm_midcomms_close(cm->nodeid); while (cm->addr_count--) kfree(cm->addr[cm->addr_count]); config_item_put(i);