From 1e8dc1da0de0add40d56fcbab9b64a3c5e61b6dd Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 8 Feb 2024 20:29:16 +0800 Subject: [PATCH] Fix crash due to merge of quicklist node introduced by #12955 (#13040) Fix two crash introducted by #12955 When a quicklist node can't be inserted and split, we eventually merge the current node with its neighboring nodes after inserting, and compress the current node and its siblings. 1. When the current node is merged with another node, the current node may become invalid and can no longer be used. Solution: let `_quicklistMergeNodes()` return the merged nodes. 3. If the current node is a LZF quicklist node, its recompress will be 1. If the split node can be merged with a sibling node to become head or tail, recompress may cause the head and tail to be compressed, which is not allowed. Solution: always recompress to 0 after merging. --- src/quicklist.c | 32 +++++++++++++++++++++------ tests/unit/type/list.tcl | 48 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/src/quicklist.c b/src/quicklist.c index f8cb62e23..3b1187897 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -105,7 +105,7 @@ quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *no void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm); quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after); -void _quicklistMergeNodes(quicklist *quicklist, quicklistNode *center); +quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center); /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ @@ -381,6 +381,15 @@ REDIS_STATIC void __quicklistCompress(const quicklist *quicklist, quicklistCompressNode(reverse); } +/* This macro is used to compress a node. + * + * If the 'recompress' flag of the node is true, we compress it directly without + * checking whether it is within the range of compress depth. + * However, it's important to ensure that the 'recompress' flag of head and tail + * is always false, as we always assume that head and tail are not compressed. + * + * If the 'recompress' flag of the node is false, we check whether the node is + * within the range of compress depth before compressing it. */ #define quicklistCompress(_ql, _node) \ do { \ if ((_node)->recompress) \ @@ -794,9 +803,14 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry, unsigned char *p = lpSeek(entry->node->entry, -1); quicklistDelIndex(quicklist, entry->node, &p); entry->node->dont_compress = 0; /* Re-enable compression */ - _quicklistMergeNodes(quicklist, entry->node); - quicklistCompress(quicklist, entry->node); - quicklistCompress(quicklist, entry->node->next); + new_node = _quicklistMergeNodes(quicklist, new_node); + /* We can't know if the current node and its sibling nodes are correctly compressed, + * and we don't know if they are within the range of compress depth, so we need to + * use quicklistCompress() for compression, which checks if node is within compress + * depth before compressing. */ + quicklistCompress(quicklist, new_node); + quicklistCompress(quicklist, new_node->prev); + if (new_node->next) quicklistCompress(quicklist, new_node->next); } } @@ -854,6 +868,8 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist, } keep->count = lpLength(keep->entry); quicklistNodeUpdateSz(keep); + keep->recompress = 0; /* Prevent 'keep' from being recompressed if + * it becomes head or tail after merging. */ nokeep->count = 0; __quicklistDelNode(quicklist, nokeep); @@ -872,9 +888,10 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist, * - (center->next, center->next->next) * - (center->prev, center) * - (center, center->next) + * + * Returns the new 'center' after merging. */ -REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist, - quicklistNode *center) { +REDIS_STATIC quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center) { int fill = quicklist->fill; quicklistNode *prev, *prev_prev, *next, *next_next, *target; prev = prev_prev = next = next_next = target = NULL; @@ -914,8 +931,9 @@ REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist, /* Use result of center merge (or original) to merge with next node. */ if (_quicklistNodeAllowMerge(target, target->next, fill)) { - _quicklistListpackMerge(quicklist, target, target->next); + target = _quicklistListpackMerge(quicklist, target, target->next); } + return target; } /* Split 'node' into two parts, parameterized by 'offset' and 'after'. diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 55ff86545..6a02960d4 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -394,7 +394,53 @@ if {[lindex [r config get proto-max-bulk-len] 1] == 10000000000} { r ping } {PONG} {large-memory} - test {Test LMOVE on plain nodes over 4GB} { + test {Test LSET splits a quicklist node, and then merge} { + # Test when a quicklist node can't be inserted and is split, the split + # node merges with the node before it and the `before` node is kept. + r flushdb + r rpush lst [string repeat "x" 4096] + r lpush lst a b c d e f g + r lpush lst [string repeat "y" 4096] + # now: [y...] [g f e d c b a x...] + # (node0) (node1) + # Keep inserting elements into node1 until node1 is split into two + # nodes([g] [...]), eventually node0 will merge with the [g] node. + # Since node0 is larger, after the merge node0 will be kept and + # the [g] node will be deleted. + for {set i 7} {$i >= 3} {incr i -1} { + r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n" + write_big_bulk 1000000000 + } + assert_equal "g" [r lindex lst 1] + r ping + } {PONG} {large-memory} + + test {Test LSET splits a LZF compressed quicklist node, and then merge} { + # Test when a LZF compressed quicklist node can't be inserted and is split, + # the split node merges with the node before it and the split node is kept. + r flushdb + r config set list-compress-depth 1 + r lpush lst [string repeat "x" 2000] + r rpush lst [string repeat "y" 7000] + r rpush lst a b c d e f g + r rpush lst [string repeat "z" 8000] + r lset lst 0 h + # now: [h] [y... a b c d e f g] [z...] + # node0 node1(LZF) + # Keep inserting elements into node1 until node1 is split into two + # nodes([y...] [...]), eventually node0 will merge with the [y...] node. + # Since [y...] node is larger, after the merge node0 will be deleted and + # the [y...] node will be kept. + for {set i 7} {$i >= 3} {incr i -1} { + r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n" + write_big_bulk 1000000000 + } + assert_equal "h" [r lindex lst 0] + r config set list-compress-depth 0 + r ping + } {PONG} {large-memory} + + test {Test LMOVE on plain nodes over 4GB} { r flushdb r RPUSH lst2{t} "aa" r RPUSH lst2{t} "bb"