Slot migration improvement (#445)

This commit is contained in:
Ping Xie 2024-05-06 21:40:28 -07:00 committed by GitHub
parent e2aec3b1a2
commit 6e7af9471c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1117 additions and 334 deletions

View File

@ -187,7 +187,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
@ -203,7 +204,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
@ -241,6 +243,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
@ -598,23 +602,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}
}
/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
}
/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}
/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}
/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}
/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
}
/* Postpone client from executing a command. For example the server might be busy

View File

@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);

File diff suppressed because it is too large Load Diff

View File

@ -54,6 +54,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))

View File

@ -851,7 +851,9 @@ struct COMMAND_ARG CLUSTER_SET_CONFIG_EPOCH_Args[] = {
#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SETSLOT history */
#define CLUSTER_SETSLOT_History NULL
commandHistory CLUSTER_SETSLOT_History[] = {
{"8.0.0","Added the `TIMEOUT` option."},
};
#endif
#ifndef SKIP_CMD_TIPS_TABLE
@ -876,6 +878,7 @@ struct COMMAND_ARG CLUSTER_SETSLOT_subcommand_Subargs[] = {
struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("subcommand",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,4,NULL),.subargs=CLUSTER_SETSLOT_subcommand_Subargs},
{MAKE_ARG("timeout",ARG_TYPE_INTEGER,-1,"TIMEOUT",NULL,"8.0.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="timeout"},
};
/********** CLUSTER SHARDS ********************/
@ -969,7 +972,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args},
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},

View File

@ -7,10 +7,17 @@
"arity": -4,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"history": [
[
"8.0.0",
"Added the `TIMEOUT` option."
]
],
"command_flags": [
"NO_ASYNC_LOADING",
"ADMIN",
"STALE"
"STALE",
"MAY_REPLICATE"
],
"arguments": [
{
@ -45,6 +52,14 @@
"token": "STABLE"
}
]
},
{
"name": "timeout",
"display": "timeout",
"type": "integer",
"token": "TIMEOUT",
"optional": true,
"since": "8.0.0"
}
],
"reply_schema": {

View File

@ -873,8 +873,7 @@ NULL
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
replicationFeedSlaves(server.slaves, -1,
c->argv + 2, c->argc - 2);
replicationFeedSlaves(-1, c->argv + 2, c->argc - 2);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
sds errstr = sdsnewlen("-",1);

View File

@ -2086,7 +2086,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~CLIENT_EXECUTING_COMMAND;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);
/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);

View File

@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
robj *argv[2];
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
argv[1] = &keyobj;
replicationFeedSlaves(server.slaves,dbid,argv,2);
replicationFeedSlaves(dbid,argv,2);
}
sdsfree(key);
decrRefCount(val);

View File

@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) {
* received by our clients in order to create the replication stream.
* Instead if the instance is a replica and has sub-replicas attached, we use
* replicationFeedStreamFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
void replicationFeedSlaves(int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];
@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) {
if (server.repl_backlog == NULL && listLength(server.slaves) == 0) {
/* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
* even when there's no replication active. This code will not be reached if AOF
* is also disabled. */
@ -460,7 +460,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
serverAssert(!(listLength(server.slaves) != 0 && server.repl_backlog == NULL));
/* Must install write handler for all replicas first before feeding
* replication stream. */
@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) {
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
/* Replicate slot being migrated/imported to the new replica */
clusterReplicateOpenSlots();
return 1;
}
@ -3619,8 +3622,8 @@ void unblockClientWaitingReplicas(client *c) {
updateStatsOnUnblock(c, 0, 0, 0);
}
/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked
* since we received enough ACKs from slaves. */
/* Check if there are clients blocked in WAIT, WAITAOF, or WAIT_PREREPL
* that can be unblocked since we received enough ACKs from replicas. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
long long last_aof_offset = 0;
@ -3637,6 +3640,7 @@ void processClientsWaitingReplicas(void) {
client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;
if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
@ -3686,6 +3690,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
@ -3788,8 +3794,7 @@ void replicationCron(void) {
if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
replicationFeedSlaves(-1, ping_argv, 1);
}
}

View File

@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) {
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
replicationFeedSlaves(server.slaves, -1, argv, 3);
replicationFeedSlaves(-1, argv, 3);
}
extern int ProcessingEventsWhileBlocked;
@ -1999,6 +1999,10 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*",1);
shared.special_equals = createStringObject("=",1);
shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
shared.cluster = createStringObject("CLUSTER", 7);
shared.setslot = createStringObject("SETSLOT", 7);
shared.importing = createStringObject("IMPORTING", 9);
shared.migrating = createStringObject("MIGRATING", 9);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
@ -3314,7 +3318,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
feedAppendOnlyFile(dbid,argv,argc);
if (target & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
replicationFeedSlaves(dbid,argv,argc);
}
/* Used inside commands to schedule the propagation of additional commands

View File

@ -402,6 +402,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -415,6 +416,7 @@ typedef enum blocking_type {
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;
@ -1334,7 +1336,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk, *cluster, *setslot, *importing, *migrating,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@ -2820,7 +2822,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlaves(int dictid, robj **argv, int argc);
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
void feedReplicationBuffer(char *buf, size_t len);
@ -3433,7 +3435,9 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with);

View File

@ -5,10 +5,6 @@
# 4. migration is half finished on "migrating" node
# 5. migration is half finished on "importing" node
# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).
if {false} {
source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"
@ -95,4 +91,3 @@ test "Half-finish importing" {
}
config_set_all_nodes cluster-allow-replica-migration yes
}

View File

@ -1,10 +1,5 @@
# Tests for many simultaneous migrations.
# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).
if {false} {
source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"
@ -61,4 +56,3 @@ test "Keys are accessible" {
}
config_set_all_nodes cluster-allow-replica-migration yes
}

View File

@ -317,13 +317,10 @@ test {Migrate the last slot away from a node using valkey-cli} {
catch { $newnode_r get foo } e
assert_equal "MOVED $slot $owner_host:$owner_port" $e
# Check that the empty node has turned itself into a replica of the new
# owner and that the new owner knows that.
wait_for_condition 1000 50 {
[string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]]
} else {
fail "Empty node didn't turn itself into a replica."
}
# Check that the now empty primary node doesn't turn itself into
# a replica of any other nodes
wait_for_cluster_propagation
assert_match *master* [$owner_r role]
}
}

View File

@ -146,18 +146,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 2
[llength [R 6 CLUSTER SLOTS]] eq 3
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
} else {
fail "hostname for shard-1 didn't reach node 6"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
} else {
fail "hostname for shard-2 didn't reach node 6"
}

View File

@ -0,0 +1,357 @@
proc get_open_slots {srv_idx} {
set slots [dict get [cluster_get_myself $srv_idx] slots]
if {[regexp {\[.*} $slots slots]} {
set slots [regsub -all {[{}]} $slots ""]
return $slots
} else {
return {}
}
}
proc get_cluster_role {srv_idx} {
set flags [dict get [cluster_get_myself $srv_idx] flags]
set role [lindex $flags 1]
return $role
}
proc wait_for_role {srv_idx role} {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
# wait for a gossip cycle for states to be propagated throughout the cluster
after $node_timeout
wait_for_condition 100 100 {
[lindex [split [R $srv_idx ROLE] " "] 0] eq $role
} else {
fail "R $srv_idx didn't assume the replication $role in time"
}
wait_for_condition 100 100 {
[get_cluster_role $srv_idx] eq $role
} else {
fail "R $srv_idx didn't assume the cluster $role in time"
}
wait_for_cluster_propagation
}
proc wait_for_slot_state {srv_idx pattern} {
wait_for_condition 100 100 {
[get_open_slots $srv_idx] eq $pattern
} else {
fail "incorrect slot state on R $srv_idx: expected $pattern; got [get_open_slots $srv_idx]"
}
}
# Check if the server responds with "PONG"
proc check_server_response {server_id} {
# Send a PING command and check if the response is "PONG"
return [expr {[catch {R $server_id PING} result] == 0 && $result eq "PONG"}]
}
# restart a server and wait for it to come back online
proc restart_server_and_wait {server_id} {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
set result [catch {R $server_id DEBUG RESTART [expr 3*$node_timeout]} err]
# Check if the error is the expected "I/O error reading reply"
if {$result != 0 && $err ne "I/O error reading reply"} {
fail "Unexpected error restarting server $server_id: $err"
}
wait_for_condition 100 100 {
[check_server_response $server_id] eq 1
} else {
fail "Server $server_id didn't come back online in time"
}
}
start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
set R0_id [R 0 CLUSTER MYID]
set R1_id [R 1 CLUSTER MYID]
set R2_id [R 2 CLUSTER MYID]
set R3_id [R 3 CLUSTER MYID]
set R4_id [R 4 CLUSTER MYID]
set R5_id [R 5 CLUSTER MYID]
test "Slot migration states are replicated" {
# Validate initial states
assert_not_equal [get_open_slots 0] "\[609->-$R1_id\]"
assert_not_equal [get_open_slots 1] "\[609-<-$R0_id\]"
assert_not_equal [get_open_slots 3] "\[609->-$R1_id\]"
assert_not_equal [get_open_slots 4] "\[609-<-$R0_id\]"
# Kick off the migration of slot 609 from R0 to R1
assert_equal {OK} [R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id]
assert_equal {OK} [R 1 CLUSTER SETSLOT 609 IMPORTING $R0_id]
# Validate that R0 is migrating slot 609 to R1
assert_equal [get_open_slots 0] "\[609->-$R1_id\]"
# Validate that R1 is importing slot 609 from R0
assert_equal [get_open_slots 1] "\[609-<-$R0_id\]"
# Validate final states
wait_for_slot_state 0 "\[609->-$R1_id\]"
wait_for_slot_state 1 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R1_id\]"
wait_for_slot_state 4 "\[609-<-$R0_id\]"
}
test "Migration target is auto-updated after failover in target shard" {
# Restart R1 to trigger an auto-failover to R4
restart_server_and_wait 1
# Wait for R1 to become a replica
wait_for_role 1 slave
# Validate final states
wait_for_slot_state 0 "\[609->-$R4_id\]"
wait_for_slot_state 1 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R4_id\]"
wait_for_slot_state 4 "\[609-<-$R0_id\]"
# Restore R1's primaryship
assert_equal {OK} [R 1 cluster failover]
wait_for_role 1 master
# Validate initial states
wait_for_slot_state 0 "\[609->-$R1_id\]"
wait_for_slot_state 1 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R1_id\]"
wait_for_slot_state 4 "\[609-<-$R0_id\]"
}
test "Migration source is auto-updated after failover in source shard" {
# Restart R0 to trigger an auto-failover to R3
restart_server_and_wait 0
# Wait for R0 to become a replica
wait_for_role 0 slave
# Validate final states
wait_for_slot_state 0 "\[609->-$R1_id\]"
wait_for_slot_state 1 "\[609-<-$R3_id\]"
wait_for_slot_state 3 "\[609->-$R1_id\]"
wait_for_slot_state 4 "\[609-<-$R3_id\]"
# Restore R0's primaryship
assert_equal {OK} [R 0 cluster failover]
wait_for_role 0 master
# Validate final states
wait_for_slot_state 0 "\[609->-$R1_id\]"
wait_for_slot_state 1 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R1_id\]"
wait_for_slot_state 4 "\[609-<-$R0_id\]"
}
test "Replica redirects key access in migrating slots" {
# Validate initial states
assert_equal [get_open_slots 0] "\[609->-$R1_id\]"
assert_equal [get_open_slots 1] "\[609-<-$R0_id\]"
assert_equal [get_open_slots 3] "\[609->-$R1_id\]"
assert_equal [get_open_slots 4] "\[609-<-$R0_id\]"
catch {[R 3 get aga]} e
assert_equal {MOVED} [lindex [split $e] 0]
assert_equal {609} [lindex [split $e] 1]
}
test "New replica inherits migrating slot" {
# Reset R3 to turn it into an empty node
assert_equal [get_open_slots 3] "\[609->-$R1_id\]"
assert_equal {OK} [R 3 CLUSTER RESET]
assert_not_equal [get_open_slots 3] "\[609->-$R1_id\]"
# Add R3 back as a replica of R0
assert_equal {OK} [R 3 CLUSTER MEET [srv 0 "host"] [srv 0 "port"]]
wait_for_role 0 master
assert_equal {OK} [R 3 CLUSTER REPLICATE $R0_id]
wait_for_role 3 slave
# Validate that R3 now sees slot 609 open
assert_equal [get_open_slots 3] "\[609->-$R1_id\]"
}
test "New replica inherits importing slot" {
# Reset R4 to turn it into an empty node
assert_equal [get_open_slots 4] "\[609-<-$R0_id\]"
assert_equal {OK} [R 4 CLUSTER RESET]
assert_not_equal [get_open_slots 4] "\[609-<-$R0_id\]"
# Add R4 back as a replica of R1
assert_equal {OK} [R 4 CLUSTER MEET [srv -1 "host"] [srv -1 "port"]]
wait_for_role 1 master
assert_equal {OK} [R 4 CLUSTER REPLICATE $R1_id]
wait_for_role 4 slave
# Validate that R4 now sees slot 609 open
assert_equal [get_open_slots 4] "\[609-<-$R0_id\]"
}
}
proc create_empty_shard {p r} {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
assert_equal {OK} [R $p CLUSTER RESET]
assert_equal {OK} [R $r CLUSTER RESET]
assert_equal {OK} [R $p CLUSTER MEET [srv 0 "host"] [srv 0 "port"]]
assert_equal {OK} [R $r CLUSTER MEET [srv 0 "host"] [srv 0 "port"]]
wait_for_role $p master
assert_equal {OK} [R $r CLUSTER REPLICATE [R $p CLUSTER MYID]]
wait_for_role $r slave
wait_for_role $p master
}
start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
set R0_id [R 0 CLUSTER MYID]
set R1_id [R 1 CLUSTER MYID]
set R2_id [R 2 CLUSTER MYID]
set R3_id [R 3 CLUSTER MYID]
set R4_id [R 4 CLUSTER MYID]
set R5_id [R 5 CLUSTER MYID]
create_empty_shard 6 7
set R6_id [R 6 CLUSTER MYID]
set R7_id [R 7 CLUSTER MYID]
test "Empty-shard migration replicates slot importing states" {
# Validate initial states
assert_not_equal [get_open_slots 0] "\[609->-$R6_id\]"
assert_not_equal [get_open_slots 6] "\[609-<-$R0_id\]"
assert_not_equal [get_open_slots 3] "\[609->-$R6_id\]"
assert_not_equal [get_open_slots 7] "\[609-<-$R0_id\]"
# Kick off the migration of slot 609 from R0 to R6
assert_equal {OK} [R 0 CLUSTER SETSLOT 609 MIGRATING $R6_id]
assert_equal {OK} [R 6 CLUSTER SETSLOT 609 IMPORTING $R0_id]
# Validate that R0 is migrating slot 609 to R6
assert_equal [get_open_slots 0] "\[609->-$R6_id\]"
# Validate that R6 is importing slot 609 from R0
assert_equal [get_open_slots 6] "\[609-<-$R0_id\]"
# Validate final states
wait_for_slot_state 0 "\[609->-$R6_id\]"
wait_for_slot_state 6 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R6_id\]"
wait_for_slot_state 7 "\[609-<-$R0_id\]"
}
test "Empty-shard migration target is auto-updated after faiover in target shard" {
wait_for_role 6 master
# Restart R6 to trigger an auto-failover to R7
restart_server_and_wait 6
# Wait for R6 to become a replica
wait_for_role 6 slave
# Validate final states
wait_for_slot_state 0 "\[609->-$R7_id\]"
wait_for_slot_state 6 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R7_id\]"
wait_for_slot_state 7 "\[609-<-$R0_id\]"
# Restore R6's primaryship
assert_equal {OK} [R 6 cluster failover]
wait_for_role 6 master
# Validate final states
wait_for_slot_state 0 "\[609->-$R6_id\]"
wait_for_slot_state 6 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R6_id\]"
wait_for_slot_state 7 "\[609-<-$R0_id\]"
}
test "Empty-shard migration source is auto-updated after faiover in source shard" {
wait_for_role 0 master
# Restart R0 to trigger an auto-failover to R3
restart_server_and_wait 0
# Wait for R0 to become a replica
wait_for_role 0 slave
# Validate final states
wait_for_slot_state 0 "\[609->-$R6_id\]"
wait_for_slot_state 6 "\[609-<-$R3_id\]"
wait_for_slot_state 3 "\[609->-$R6_id\]"
wait_for_slot_state 7 "\[609-<-$R3_id\]"
# Restore R0's primaryship
assert_equal {OK} [R 0 cluster failover]
wait_for_role 0 master
# Validate final states
wait_for_slot_state 0 "\[609->-$R6_id\]"
wait_for_slot_state 6 "\[609-<-$R0_id\]"
wait_for_slot_state 3 "\[609->-$R6_id\]"
wait_for_slot_state 7 "\[609-<-$R0_id\]"
}
}
proc migrate_slot {from to slot} {
set from_id [R $from CLUSTER MYID]
set to_id [R $to CLUSTER MYID]
assert_equal {OK} [R $from CLUSTER SETSLOT $slot MIGRATING $to_id]
assert_equal {OK} [R $to CLUSTER SETSLOT $slot IMPORTING $from_id]
}
start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
set R0_id [R 0 CLUSTER MYID]
set R1_id [R 1 CLUSTER MYID]
set R2_id [R 2 CLUSTER MYID]
set R3_id [R 3 CLUSTER MYID]
set R4_id [R 4 CLUSTER MYID]
set R5_id [R 5 CLUSTER MYID]
test "Multiple slot migration states are replicated" {
migrate_slot 0 1 13
migrate_slot 0 1 7
migrate_slot 0 1 17
# Validate final states
wait_for_slot_state 0 "\[7->-$R1_id\] \[13->-$R1_id\] \[17->-$R1_id\]"
wait_for_slot_state 1 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]"
wait_for_slot_state 3 "\[7->-$R1_id\] \[13->-$R1_id\] \[17->-$R1_id\]"
wait_for_slot_state 4 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]"
}
test "New replica inherits multiple migrating slots" {
# Reset R3 to turn it into an empty node
assert_equal {OK} [R 3 CLUSTER RESET]
# Add R3 back as a replica of R0
assert_equal {OK} [R 3 CLUSTER MEET [srv 0 "host"] [srv 0 "port"]]
wait_for_role 0 master
assert_equal {OK} [R 3 CLUSTER REPLICATE $R0_id]
wait_for_role 3 slave
# Validate final states
wait_for_slot_state 3 "\[7->-$R1_id\] \[13->-$R1_id\] \[17->-$R1_id\]"
}
test "Slot finalization succeeds on both primary and replicas" {
assert_equal {OK} [R 1 CLUSTER SETSLOT 7 NODE $R1_id]
wait_for_slot_state 1 "\[13-<-$R0_id\] \[17-<-$R0_id\]"
wait_for_slot_state 4 "\[13-<-$R0_id\] \[17-<-$R0_id\]"
assert_equal {OK} [R 1 CLUSTER SETSLOT 13 NODE $R1_id]
wait_for_slot_state 1 "\[17-<-$R0_id\]"
wait_for_slot_state 4 "\[17-<-$R0_id\]"
assert_equal {OK} [R 1 CLUSTER SETSLOT 17 NODE $R1_id]
wait_for_slot_state 1 ""
wait_for_slot_state 4 ""
}
}
start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
set R0_id [R 0 CLUSTER MYID]
set R1_id [R 1 CLUSTER MYID]
test "Slot is auto-claimed by target after source relinquishes ownership" {
migrate_slot 0 1 609
#Validate that R1 doesn't own slot 609
catch {[R 1 get aga]} e
assert_equal {MOVED} [lindex [split $e] 0]
#Finalize the slot on the source first
assert_equal {OK} [R 0 CLUSTER SETSLOT 609 NODE $R1_id]
after $node_timeout
#R1 should claim slot 609 since it is still importing slot 609
#from R0 but R0 no longer owns this slot
assert_equal {OK} [R 1 set aga foo]
}
}
start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set R1_id [R 1 CLUSTER MYID]
test "CLUSTER SETSLOT with an explicit timeout" {
# Simulate a replica crash
catch {R 3 DEBUG SEGFAULT} e
# Setslot with an explicit 1ms timeoout
set start_time [clock milliseconds]
catch {R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id TIMEOUT 3000} e
set end_time [clock milliseconds]
set duration [expr {$end_time - $start_time}]
# Assert that the execution time is greater than the default 2s timeout
assert {$duration > 2000}
# Setslot should fail with not enough good replicas to write after the timeout
assert_equal {NOREPLICAS Not enough good replicas to write.} $e
}
}

View File

@ -1672,8 +1672,10 @@ aof-timestamp-enabled no
# cluster-migration-barrier 1
# Turning off this option allows to use less automatic cluster configuration.
# It both disables migration to orphaned masters and migration from masters
# that became empty.
# It disables migration of replicas to orphaned masters. Masters that become
# empty due to losing their last slots to another master will not automatically
# replicate from the master that took over their last slots. Instead, they will
# remain as empty masters without any slots.
#
# Default is 'yes' (allow automatic migrations).
#