diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 818484315906..e06fa17c5603 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -333,6 +333,7 @@ struct dlm_rsb { struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ int res_recover_locks_count; + struct rcu_head rcu; char *res_lvbptr; char res_name[DLM_RESNAME_MAXLEN+1]; @@ -366,6 +367,7 @@ enum rsb_flags { RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, RSB_INACTIVE, + RSB_HASHED, /* set while rsb is on ls_rsbtbl */ }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 5ca3f29bef7d..8bee4f444afd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -563,6 +563,7 @@ void dlm_rsb_scan(struct timer_list *timer) list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); @@ -636,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - return rhashtable_insert_fast(rhash, &rsb->res_node, - dlm_rhash_rsb_params); + int rv; + + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); + + return rv; } /* @@ -752,11 +759,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in inactive state - * if not it's in active state and we relookup - unlikely path. + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; @@ -926,11 +945,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still inactive. - * if it's active, repeat lookup - unlikely path. - */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; @@ -1012,12 +1028,54 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, return error; } +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + static int find_rsb(struct dlm_ls *ls, const void *name, int len, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { int dir_nodeid; uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; @@ -1025,12 +1083,15 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len, hash = jhash(name, len, 0); dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -1187,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, - int len, unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; uint32_t hash; @@ -1315,6 +1376,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { struct dlm_rsb *r; @@ -4293,6 +4364,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); write_unlock_bh(&ls->ls_rsbtbl_lock); free_inactive_rsb(r); diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 15a8b1cee433..105a79978706 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -101,13 +101,19 @@ struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls) return r; } -void dlm_free_rsb(struct dlm_rsb *r) +static void __free_rsb_rcu(struct rcu_head *rcu) { + struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu); if (r->res_lvbptr) dlm_free_lvb(r->res_lvbptr); kmem_cache_free(rsb_cache, r); } +void dlm_free_rsb(struct dlm_rsb *r) +{ + call_rcu(&r->rcu, __free_rsb_rcu); +} + struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) { struct dlm_lkb *lkb;