support XREAD[GROUP] with BLOCK option in scripts (#12596)

In #11568 we removed the NOSCRIPT flag from commands and keep the BLOCKING flag.
Aiming to allow them in scripts and let them implicitly behave in the non-blocking way.

In that sense, the old behavior was to allow LPOP and reject BLPOP, and the new behavior,
is to allow BLPOP too, and fail it only in case it ends up blocking.
So likewise, so far we allowed XREAD and rejected XREAD BLOCK, and we will now allow
that too, and only reject it if it ends up blocking.
This commit is contained in:
zhaozhao.zz 2023-10-12 15:54:50 +08:00 committed by GitHub
parent e5ef161374
commit 77a65e82b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 14 deletions

View File

@ -10861,7 +10861,7 @@ struct COMMAND_STRUCT redisCommandTable[] = {
{MAKE_CMD("xlen","Return the number of messages in a stream.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XLEN_History,0,XLEN_Tips,0,xlenCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_STREAM,XLEN_Keyspecs,1,NULL,1),.args=XLEN_Args}, {MAKE_CMD("xlen","Return the number of messages in a stream.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XLEN_History,0,XLEN_Tips,0,xlenCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_STREAM,XLEN_Keyspecs,1,NULL,1),.args=XLEN_Args},
{MAKE_CMD("xpending","Returns the information and entries from a stream consumer group's pending entries list.","O(N) with N being the number of elements returned, so asking for a small fixed number of entries per call is O(1). O(M), where M is the total number of entries scanned when used with the IDLE filter. When the command returns just the summary and the list of consumers is small, it runs in O(1) time; otherwise, an additional O(N) time for iterating every consumer.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XPENDING_History,1,XPENDING_Tips,1,xpendingCommand,-3,CMD_READONLY,ACL_CATEGORY_STREAM,XPENDING_Keyspecs,1,NULL,3),.args=XPENDING_Args}, {MAKE_CMD("xpending","Returns the information and entries from a stream consumer group's pending entries list.","O(N) with N being the number of elements returned, so asking for a small fixed number of entries per call is O(1). O(M), where M is the total number of entries scanned when used with the IDLE filter. When the command returns just the summary and the list of consumers is small, it runs in O(1) time; otherwise, an additional O(N) time for iterating every consumer.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XPENDING_History,1,XPENDING_Tips,1,xpendingCommand,-3,CMD_READONLY,ACL_CATEGORY_STREAM,XPENDING_Keyspecs,1,NULL,3),.args=XPENDING_Args},
{MAKE_CMD("xrange","Returns the messages from a stream within a range of IDs.","O(N) with N being the number of elements being returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XRANGE_History,1,XRANGE_Tips,0,xrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,XRANGE_Keyspecs,1,NULL,4),.args=XRANGE_Args}, {MAKE_CMD("xrange","Returns the messages from a stream within a range of IDs.","O(N) with N being the number of elements being returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XRANGE_History,1,XRANGE_Tips,0,xrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,XRANGE_Keyspecs,1,NULL,4),.args=XRANGE_Args},
{MAKE_CMD("xread","Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.",NULL,"5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREAD_History,0,XREAD_Tips,0,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY|CMD_BLOCKING,ACL_CATEGORY_STREAM,XREAD_Keyspecs,1,xreadGetKeys,3),.args=XREAD_Args}, {MAKE_CMD("xread","Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.",NULL,"5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREAD_History,0,XREAD_Tips,0,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY,ACL_CATEGORY_STREAM,XREAD_Keyspecs,1,xreadGetKeys,3),.args=XREAD_Args},
{MAKE_CMD("xreadgroup","Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREADGROUP_History,0,XREADGROUP_Tips,0,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,XREADGROUP_Keyspecs,1,xreadGetKeys,5),.args=XREADGROUP_Args}, {MAKE_CMD("xreadgroup","Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREADGROUP_History,0,XREADGROUP_Tips,0,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,XREADGROUP_Keyspecs,1,xreadGetKeys,5),.args=XREADGROUP_Args},
{MAKE_CMD("xrevrange","Returns the messages from a stream within a range of IDs in reverse order.","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREVRANGE_History,1,XREVRANGE_Tips,0,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,XREVRANGE_Keyspecs,1,NULL,4),.args=XREVRANGE_Args}, {MAKE_CMD("xrevrange","Returns the messages from a stream within a range of IDs in reverse order.","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XREVRANGE_History,1,XREVRANGE_Tips,0,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,XREVRANGE_Keyspecs,1,NULL,4),.args=XREVRANGE_Args},
{MAKE_CMD("xsetid","An internal command for replicating stream values.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XSETID_History,1,XSETID_Tips,0,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,XSETID_Keyspecs,1,NULL,4),.args=XSETID_Args}, {MAKE_CMD("xsetid","An internal command for replicating stream values.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"stream",COMMAND_GROUP_STREAM,XSETID_History,1,XSETID_Tips,0,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,XSETID_Keyspecs,1,NULL,4),.args=XSETID_Args},

View File

@ -8,8 +8,7 @@
"get_keys_function": "xreadGetKeys", "get_keys_function": "xreadGetKeys",
"command_flags": [ "command_flags": [
"BLOCKING", "BLOCKING",
"READONLY", "READONLY"
"BLOCKING"
], ],
"acl_categories": [ "acl_categories": [
"STREAM" "STREAM"

View File

@ -2188,14 +2188,6 @@ void xreadCommand(client *c) {
int moreargs = c->argc-i-1; int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr; char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) { if (!strcasecmp(o,"BLOCK") && moreargs) {
if (c->flags & CLIENT_SCRIPT) {
/*
* Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
* on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility.
* There is no sense to use BLOCK option within Lua. */
addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
return;
}
i++; i++;
if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
UNIT_MILLISECONDS) != C_OK) return; UNIT_MILLISECONDS) != C_OK) return;

View File

@ -257,17 +257,38 @@ start_server {tags {"scripting"}} {
run_script {return redis.pcall('wait','1','0')} 0 run_script {return redis.pcall('wait','1','0')} 0
} {0} } {0}
test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} { test {EVAL - Scripts do not block on XREAD with BLOCK option} {
r del s r del s
r xgroup create s g $ MKSTREAM r xgroup create s g $ MKSTREAM
set res [run_script {return redis.pcall('xread','STREAMS','s','$')} 1 s] set res [run_script {return redis.pcall('xread','STREAMS','s','$')} 1 s]
assert {$res eq {}} assert {$res eq {}}
assert_error "*xread command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s} run_script {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s
} {}
test {EVAL - Scripts do not block on XREADGROUP with BLOCK option} {
set res [run_script {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s] set res [run_script {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s]
assert {$res eq {}} assert {$res eq {}}
assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s} run_script {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s
} {}
test {EVAL - Scripts do not block on XREAD with BLOCK option -- non empty stream} {
r XADD s * a 1
set res [run_script {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s]
assert {$res eq {}}
set res [run_script {return redis.pcall('xread','BLOCK',0,'STREAMS','s','0-0')} 1 s]
assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {a 1}}
} }
test {EVAL - Scripts do not block on XREADGROUP with BLOCK option -- non empty stream} {
r XADD s * b 2
set res [
run_script {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s
]
assert {[llength [lindex $res 0 1]] == 2}
lindex $res 0 1 0 1
} {a 1}
test {EVAL - Scripts can run non-deterministic commands} { test {EVAL - Scripts can run non-deterministic commands} {
set e {} set e {}
catch { catch {