Sharded pubsub command execution within multi/exec (#13)
Allow SPUBLISH command within multi/exec on replica.
This commit is contained in:
parent
699f62e8e3
commit
c120a45874
@ -1030,9 +1030,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
mc.cmd = cmd;
|
||||
}
|
||||
|
||||
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
|
||||
cmd->proc == sunsubscribeCommand ||
|
||||
cmd->proc == spublishCommand;
|
||||
uint64_t cmd_flags = getCommandFlags(c);
|
||||
|
||||
/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
|
||||
int pubsubshard_included = (cmd_flags & CMD_PUBSUB) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB));
|
||||
|
||||
/* Check that all the keys are in the same hash slot, and obtain this
|
||||
* slot and the node associated. */
|
||||
@ -1109,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
* node until the migration completes with CLUSTER SETSLOT <slot>
|
||||
* NODE <node-id>. */
|
||||
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
|
||||
if ((migrating_slot || importing_slot) && !is_pubsubshard)
|
||||
if ((migrating_slot || importing_slot) && !pubsubshard_included)
|
||||
{
|
||||
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
|
||||
else existing_keys++;
|
||||
@ -1122,11 +1124,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
* without redirections or errors in all the cases. */
|
||||
if (n == NULL) return myself;
|
||||
|
||||
uint64_t cmd_flags = getCommandFlags(c);
|
||||
/* Cluster is globally down but we got keys? We only serve the request
|
||||
* if it is a read command and when allow_reads_when_down is enabled. */
|
||||
if (!isClusterHealthy()) {
|
||||
if (is_pubsubshard) {
|
||||
if (pubsubshard_included) {
|
||||
if (!server.cluster_allow_pubsubshard_when_down) {
|
||||
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
|
||||
return NULL;
|
||||
@ -1189,7 +1190,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
* is serving, we can reply without redirection. */
|
||||
int is_write_command = (cmd_flags & CMD_WRITE) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
|
||||
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
|
||||
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
|
||||
!is_write_command &&
|
||||
clusterNodeIsSlave(myself) &&
|
||||
clusterNodeGetSlaveof(myself) == n)
|
||||
|
56
tests/unit/cluster/sharded-pubsub.tcl
Normal file
56
tests/unit/cluster/sharded-pubsub.tcl
Normal file
@ -0,0 +1,56 @@
|
||||
start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
set primary_id 0
|
||||
set replica1_id 1
|
||||
|
||||
set primary [Rn $primary_id]
|
||||
set replica [Rn $replica1_id]
|
||||
|
||||
test "Sharded pubsub publish behavior within multi/exec" {
|
||||
foreach {node} {primary replica} {
|
||||
set node [set $node]
|
||||
$node MULTI
|
||||
$node SPUBLISH ch1 "hello"
|
||||
$node EXEC
|
||||
}
|
||||
}
|
||||
|
||||
test "Sharded pubsub within multi/exec with cross slot operation" {
|
||||
$primary MULTI
|
||||
$primary SPUBLISH ch1 "hello"
|
||||
$primary GET foo
|
||||
catch {[$primary EXEC]} err
|
||||
assert_match {CROSSSLOT*} $err
|
||||
}
|
||||
|
||||
test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
|
||||
$primary MULTI
|
||||
$primary SPUBLISH foo "hello"
|
||||
$primary GET foo
|
||||
$primary EXEC
|
||||
} {0 {}}
|
||||
|
||||
test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
|
||||
$replica MULTI
|
||||
$replica SPUBLISH foo "hello"
|
||||
catch {[$replica GET foo]} err
|
||||
assert_match {MOVED*} $err
|
||||
catch {[$replica EXEC]} err
|
||||
assert_match {EXECABORT*} $err
|
||||
}
|
||||
|
||||
test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
|
||||
$primary MULTI
|
||||
$primary SPUBLISH foo "hello"
|
||||
$primary SET foo bar
|
||||
$primary EXEC
|
||||
} {0 OK}
|
||||
|
||||
test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
|
||||
$replica MULTI
|
||||
$replica SPUBLISH foo "hello"
|
||||
catch {[$replica SET foo bar]} err
|
||||
assert_match {MOVED*} $err
|
||||
catch {[$replica EXEC]} err
|
||||
assert_match {EXECABORT*} $err
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user