Adding module api for processing commands during busy jobs and allow flagging the commands that should be handled at this status (#9963)
Some modules might perform a long-running logic in different stages of Redis lifetime, for example: * command execution * RDB loading * thread safe context During this long-running logic Redis is not responsive. This PR offers 1. An API to process events while a busy command is running (`RM_Yield`) 2. A new flag (`ALLOW_BUSY`) to mark the commands that should be handled during busy jobs which can also be used by modules (`allow-busy`) 3. In slow commands and thread safe contexts, this flag will start rejecting commands with -BUSY only after `busy-reply-threshold` 4. During loading (`rdb_load` callback), it'll process events right away (not wait for `busy-reply-threshold`), but either way, the processing is throttled to the server hz rate. 5. Allow modules to Yield to redis background tasks, but not to client commands * rename `script-time-limit` to `busy-reply-threshold` (an alias to the pre-7.0 `lua-time-limit`) Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
22172a4aa6
commit
c4b788230c
32
redis.conf
32
redis.conf
@ -1485,23 +1485,29 @@ aof-timestamp-enabled no
|
||||
#
|
||||
# shutdown-timeout 10
|
||||
|
||||
################################ LUA SCRIPTING ###############################
|
||||
################ NON-DETERMINISTIC LONG BLOCKING COMMANDS #####################
|
||||
|
||||
# Max execution time of a Lua script in milliseconds.
|
||||
# Maximum time in milliseconds for EVAL scripts, functions and in some cases
|
||||
# modules' commands before Redis can start processing or rejecting other clients.
|
||||
#
|
||||
# If the maximum execution time is reached Redis will log that a script is
|
||||
# still in execution after the maximum allowed time and will start to
|
||||
# reply to queries with an error.
|
||||
# If the maximum execution time is reached Redis will start to reply to most
|
||||
# commands with a BUSY error.
|
||||
#
|
||||
# When a long running script exceeds the maximum execution time only the
|
||||
# SCRIPT KILL and SHUTDOWN NOSAVE commands are available. The first can be
|
||||
# used to stop a script that did not yet call any write commands. The second
|
||||
# is the only way to shut down the server in the case a write command was
|
||||
# already issued by the script but the user doesn't want to wait for the natural
|
||||
# termination of the script.
|
||||
# In this state Redis will only allow a handful of commands to be executed.
|
||||
# For instance, SCRIPT KILL, FUNCTION KILL, SHUTDOWN NOSAVE and possibly some
|
||||
# module specific 'allow-busy' commands.
|
||||
#
|
||||
# Set it to 0 or a negative value for unlimited execution without warnings.
|
||||
lua-time-limit 5000
|
||||
# SCRIPT KILL and FUNCTION KILL will only be able to stop a script that did not
|
||||
# yet call any write commands, so SHUTDOWN NOSAVE may be the only way to stop
|
||||
# the server in the case a write command was already issued by the script when
|
||||
# the user doesn't want to wait for the natural termination of the script.
|
||||
#
|
||||
# The default is 5 seconds. It is possible to set it to 0 or a negative value
|
||||
# to disable this mechanism (uninterrupted execution). Note that in the past
|
||||
# this config had a different name, which is now an alias, so both of these do
|
||||
# the same:
|
||||
# lua-time-limit 5000
|
||||
# busy-reply-threshold 5000
|
||||
|
||||
################################ REDIS CLUSTER ###############################
|
||||
|
||||
|
@ -90,16 +90,16 @@ void blockClient(client *c, int btype) {
|
||||
/* Master client should never be blocked unless pause or module */
|
||||
serverAssert(!(c->flags & CLIENT_MASTER &&
|
||||
btype != BLOCKED_MODULE &&
|
||||
btype != BLOCKED_PAUSE));
|
||||
btype != BLOCKED_POSTPONE));
|
||||
|
||||
c->flags |= CLIENT_BLOCKED;
|
||||
c->btype = btype;
|
||||
server.blocked_clients++;
|
||||
server.blocked_clients_by_type[btype]++;
|
||||
addClientToTimeoutTable(c);
|
||||
if (btype == BLOCKED_PAUSE) {
|
||||
listAddNodeTail(server.paused_clients, c);
|
||||
c->paused_list_node = listLast(server.paused_clients);
|
||||
if (btype == BLOCKED_POSTPONE) {
|
||||
listAddNodeTail(server.postponed_clients, c);
|
||||
c->postponed_list_node = listLast(server.postponed_clients);
|
||||
/* Mark this client to execute its command */
|
||||
c->flags |= CLIENT_PENDING_COMMAND;
|
||||
}
|
||||
@ -189,9 +189,9 @@ void unblockClient(client *c) {
|
||||
} else if (c->btype == BLOCKED_MODULE) {
|
||||
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
|
||||
unblockClientFromModule(c);
|
||||
} else if (c->btype == BLOCKED_PAUSE) {
|
||||
listDelNode(server.paused_clients,c->paused_list_node);
|
||||
c->paused_list_node = NULL;
|
||||
} else if (c->btype == BLOCKED_POSTPONE) {
|
||||
listDelNode(server.postponed_clients,c->postponed_list_node);
|
||||
c->postponed_list_node = NULL;
|
||||
} else if (c->btype == BLOCKED_SHUTDOWN) {
|
||||
/* No special cleanup. */
|
||||
} else {
|
||||
@ -202,7 +202,7 @@ void unblockClient(client *c) {
|
||||
* we do not do it immediately after the command returns (when the
|
||||
* client got blocked) in order to be still able to access the argument
|
||||
* vector from module callbacks and updateStatsOnUnblock. */
|
||||
if (c->btype != BLOCKED_PAUSE) {
|
||||
if (c->btype != BLOCKED_POSTPONE) {
|
||||
freeClientOriginalArgv(c);
|
||||
resetClient(c);
|
||||
}
|
||||
@ -266,11 +266,11 @@ void disconnectAllBlockedClients(void) {
|
||||
client *c = listNodeValue(ln);
|
||||
|
||||
if (c->flags & CLIENT_BLOCKED) {
|
||||
/* PAUSED clients are an exception, when they'll be unblocked, the
|
||||
/* POSTPONEd clients are an exception, when they'll be unblocked, the
|
||||
* command processing will start from scratch, and the command will
|
||||
* be either executed or rejected. (unlike LIST blocked clients for
|
||||
* which the command is already in progress in a way. */
|
||||
if (c->btype == BLOCKED_PAUSE)
|
||||
if (c->btype == BLOCKED_POSTPONE)
|
||||
continue;
|
||||
|
||||
addReplyError(c,
|
||||
|
@ -3273,11 +3273,11 @@ struct redisCommand FUNCTION_Subcommands[] = {
|
||||
{"dump","Dump all functions into a serialized binary payload","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DUMP_History,FUNCTION_DUMP_Hints,functionDumpCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"flush","Deleting all functions","O(N) where N is the number of functions deleted","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
|
||||
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
|
||||
{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING},
|
||||
{"list","List information about all the functions","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,-2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_LIST_Args},
|
||||
{"load","Create a function with the given arguments (name, code, description)","O(1) (considering compilation time is redundant)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LOAD_History,FUNCTION_LOAD_Hints,functionLoadCommand,-5,CMD_NOSCRIPT|CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_LOAD_Args},
|
||||
{"restore","Restore all the functions on the given payload","O(N) where N is the number of functions on the payload","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_RESTORE_History,FUNCTION_RESTORE_Hints,functionRestoreCommand,-3,CMD_NOSCRIPT|CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_RESTORE_Args},
|
||||
{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING},
|
||||
{0}
|
||||
};
|
||||
|
||||
@ -3385,7 +3385,7 @@ struct redisCommand SCRIPT_Subcommands[] = {
|
||||
{"exists","Check existence of scripts in the script cache.","O(N) with N being the number of scripts to check (so checking a single script is an O(1) operation).","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_EXISTS_History,SCRIPT_EXISTS_Hints,scriptCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_EXISTS_Args},
|
||||
{"flush","Remove all the scripts from the script cache.","O(N) with N being the number of scripts in cache","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_FLUSH_History,SCRIPT_FLUSH_Hints,scriptCommand,-2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_FLUSH_Args},
|
||||
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_HELP_History,SCRIPT_HELP_Hints,scriptCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
|
||||
{"kill","Kill the script currently in execution.","O(1)","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_KILL_History,SCRIPT_KILL_Hints,scriptCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"kill","Kill the script currently in execution.","O(1)","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_KILL_History,SCRIPT_KILL_Hints,scriptCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING},
|
||||
{"load","Load the specified Lua script into the script cache.","O(N) with N being the length in bytes of the script body.","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_LOAD_History,SCRIPT_LOAD_Hints,scriptCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_LOAD_Args},
|
||||
{0}
|
||||
};
|
||||
@ -6503,13 +6503,13 @@ struct redisCommand redisCommandTable[] = {
|
||||
{"readonly","Enables read queries for a connection to a cluster replica node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,READONLY_History,READONLY_Hints,readonlyCommand,1,CMD_FAST|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION},
|
||||
{"readwrite","Disables read queries for a connection to a cluster replica node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,READWRITE_History,READWRITE_Hints,readwriteCommand,1,CMD_FAST|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION},
|
||||
/* connection */
|
||||
{"auth","Authenticate to the server","O(N) where N is the number of passwords defined for the user","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,AUTH_History,AUTH_Hints,authCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,.args=AUTH_Args},
|
||||
{"auth","Authenticate to the server","O(N) where N is the number of passwords defined for the user","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,AUTH_History,AUTH_Hints,authCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,.args=AUTH_Args},
|
||||
{"client","A container for client connection commands","Depends on subcommand.","2.4.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,CLIENT_History,CLIENT_Hints,NULL,-2,CMD_SENTINEL,0,.subcommands=CLIENT_Subcommands},
|
||||
{"echo","Echo the given string","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,ECHO_History,ECHO_Hints,echoCommand,2,CMD_FAST,ACL_CATEGORY_CONNECTION,.args=ECHO_Args},
|
||||
{"hello","Handshake with Redis","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,HELLO_History,HELLO_Hints,helloCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,.args=HELLO_Args},
|
||||
{"hello","Handshake with Redis","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,HELLO_History,HELLO_Hints,helloCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,.args=HELLO_Args},
|
||||
{"ping","Ping the server","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,PING_History,PING_Hints,pingCommand,-1,CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,.args=PING_Args},
|
||||
{"quit","Close the connection","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,QUIT_History,QUIT_Hints,quitCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH,ACL_CATEGORY_CONNECTION},
|
||||
{"reset","Reset the connection","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,RESET_History,RESET_Hints,resetCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH,ACL_CATEGORY_CONNECTION},
|
||||
{"quit","Close the connection","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,QUIT_History,QUIT_Hints,quitCommand,-1,CMD_ALLOW_BUSY|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH,ACL_CATEGORY_CONNECTION},
|
||||
{"reset","Reset the connection","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,RESET_History,RESET_Hints,resetCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION},
|
||||
{"select","Change the selected database for the current connection","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CONNECTION,SELECT_History,SELECT_Hints,selectCommand,2,CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_CONNECTION,.args=SELECT_Args},
|
||||
/* generic */
|
||||
{"copy","Copy a key","O(N) worst case for collections, where N is the number of nested items. O(1) for string values.","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,COPY_History,COPY_Hints,copyCommand,-3,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_KEYSPACE,{{CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=COPY_Args},
|
||||
@ -6638,11 +6638,11 @@ struct redisCommand redisCommandTable[] = {
|
||||
{"monitor","Listen for all requests received by the server in real time",NULL,"1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,MONITOR_History,MONITOR_Hints,monitorCommand,1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0},
|
||||
{"psync","Internal command used for replication",NULL,"2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,PSYNC_History,PSYNC_Hints,syncCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NO_MULTI|CMD_NOSCRIPT,0,.args=PSYNC_Args},
|
||||
{"replconf","An internal command for configuring the replication stream","O(1)","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,REPLCONF_History,REPLCONF_Hints,replconfCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0},
|
||||
{"replicaof","Make the server a replica of another instance, or promote it as master.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,REPLICAOF_History,REPLICAOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=REPLICAOF_Args},
|
||||
{"replicaof","Make the server a replica of another instance, or promote it as master.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,REPLICAOF_History,REPLICAOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_ALLOW_BUSY|CMD_NOSCRIPT|CMD_STALE,0,.args=REPLICAOF_Args},
|
||||
{"restore-asking","An internal command for migrating keys in a cluster","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,RESTORE_ASKING_History,RESTORE_ASKING_Hints,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,{{CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}}},
|
||||
{"role","Return the role of the instance in the context of replication","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,ROLE_History,ROLE_Hints,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS},
|
||||
{"save","Synchronously save the dataset to disk","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SAVE_History,SAVE_Hints,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_NO_MULTI,0},
|
||||
{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL,0,.args=SHUTDOWN_Args},
|
||||
{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL|CMD_ALLOW_BUSY,0,.args=SHUTDOWN_Args},
|
||||
{"slaveof","Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLAVEOF_History,SLAVEOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=SLAVEOF_Args},
|
||||
{"slowlog","A container for slow log commands","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLOWLOG_History,SLOWLOG_Hints,NULL,-2,0,0,.subcommands=SLOWLOG_Subcommands},
|
||||
{"swapdb","Swaps two Redis databases","O(N) where N is the count of clients watching or blocking on keys from both databases.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SWAPDB_History,SWAPDB_Hints,swapdbCommand,3,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,.args=SWAPDB_Args},
|
||||
@ -6742,10 +6742,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
{"strlen","Get the length of the value stored in a key","O(1)","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STRING,STRLEN_History,STRLEN_Hints,strlenCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_STRING,{{CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=STRLEN_Args},
|
||||
{"substr","Get a substring of the string stored at a key","O(N) where N is the length of the returned string. The complexity is ultimately determined by the returned length, but because creating a substring from an existing string is very cheap, it can be considered O(1) for small strings.","1.0.0",CMD_DOC_DEPRECATED,"`GETRANGE`","2.0.0",COMMAND_GROUP_STRING,SUBSTR_History,SUBSTR_Hints,getrangeCommand,4,CMD_READONLY,ACL_CATEGORY_STRING,{{CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=SUBSTR_Args},
|
||||
/* transactions */
|
||||
{"discard","Discard all commands issued after MULTI","O(N), when N is the number of queued commands","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,DISCARD_History,DISCARD_Hints,discardCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION},
|
||||
{"discard","Discard all commands issued after MULTI","O(N), when N is the number of queued commands","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,DISCARD_History,DISCARD_Hints,discardCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION},
|
||||
{"exec","Execute all commands issued after MULTI","Depends on commands in the transaction","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,EXEC_History,EXEC_Hints,execCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SKIP_SLOWLOG,ACL_CATEGORY_TRANSACTION},
|
||||
{"multi","Mark the start of a transaction block","O(1)","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,MULTI_History,MULTI_Hints,multiCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION},
|
||||
{"unwatch","Forget about all watched keys","O(1)","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,UNWATCH_History,UNWATCH_Hints,unwatchCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION},
|
||||
{"watch","Watch the given keys to determine execution of the MULTI/EXEC block","O(1) for every key.","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,WATCH_History,WATCH_Hints,watchCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_TRANSACTION,{{0,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=WATCH_Args},
|
||||
{"multi","Mark the start of a transaction block","O(1)","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,MULTI_History,MULTI_Hints,multiCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION},
|
||||
{"unwatch","Forget about all watched keys","O(1)","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,UNWATCH_History,UNWATCH_Hints,unwatchCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION},
|
||||
{"watch","Watch the given keys to determine execution of the MULTI/EXEC block","O(1) for every key.","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_TRANSACTIONS,WATCH_History,WATCH_Hints,watchCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_ALLOW_BUSY,ACL_CATEGORY_TRANSACTION,{{0,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=WATCH_Args},
|
||||
{0}
|
||||
};
|
||||
|
@ -18,7 +18,8 @@
|
||||
"STALE",
|
||||
"FAST",
|
||||
"NO_AUTH",
|
||||
"SENTINEL"
|
||||
"SENTINEL",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"CONNECTION"
|
||||
|
@ -10,7 +10,8 @@
|
||||
"NOSCRIPT",
|
||||
"LOADING",
|
||||
"STALE",
|
||||
"FAST"
|
||||
"FAST",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"TRANSACTION"
|
||||
|
@ -8,7 +8,8 @@
|
||||
"container": "FUNCTION",
|
||||
"function": "functionKillCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT"
|
||||
"NOSCRIPT",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -8,7 +8,8 @@
|
||||
"container": "FUNCTION",
|
||||
"function": "functionStatsCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT"
|
||||
"NOSCRIPT",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -18,7 +18,8 @@
|
||||
"STALE",
|
||||
"FAST",
|
||||
"NO_AUTH",
|
||||
"SENTINEL"
|
||||
"SENTINEL",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"CONNECTION"
|
||||
|
@ -10,7 +10,8 @@
|
||||
"NOSCRIPT",
|
||||
"LOADING",
|
||||
"STALE",
|
||||
"FAST"
|
||||
"FAST",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"TRANSACTION"
|
||||
|
@ -7,6 +7,7 @@
|
||||
"arity": -1,
|
||||
"function": "quitCommand",
|
||||
"command_flags": [
|
||||
"ALLOW_BUSY",
|
||||
"NOSCRIPT",
|
||||
"LOADING",
|
||||
"STALE",
|
||||
|
@ -9,6 +9,7 @@
|
||||
"command_flags": [
|
||||
"NO_ASYNC_LOADING",
|
||||
"ADMIN",
|
||||
"ALLOW_BUSY",
|
||||
"NOSCRIPT",
|
||||
"STALE"
|
||||
],
|
||||
|
@ -11,7 +11,8 @@
|
||||
"LOADING",
|
||||
"STALE",
|
||||
"FAST",
|
||||
"NO_AUTH"
|
||||
"NO_AUTH",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"CONNECTION"
|
||||
|
@ -8,7 +8,8 @@
|
||||
"container": "SCRIPT",
|
||||
"function": "scriptCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT"
|
||||
"NOSCRIPT",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -17,8 +17,9 @@
|
||||
"NOSCRIPT",
|
||||
"LOADING",
|
||||
"STALE",
|
||||
"NO_MULTI",
|
||||
"SENTINEL"
|
||||
"NO_MULTI",
|
||||
"SENTINEL",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"arguments": [
|
||||
{
|
||||
|
@ -10,7 +10,8 @@
|
||||
"NOSCRIPT",
|
||||
"LOADING",
|
||||
"STALE",
|
||||
"FAST"
|
||||
"FAST",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"TRANSACTION"
|
||||
|
@ -10,7 +10,8 @@
|
||||
"NOSCRIPT",
|
||||
"LOADING",
|
||||
"STALE",
|
||||
"FAST"
|
||||
"FAST",
|
||||
"ALLOW_BUSY"
|
||||
],
|
||||
"acl_categories": [
|
||||
"TRANSACTION"
|
||||
|
@ -2867,7 +2867,7 @@ standardConfig configs[] = {
|
||||
createULongConfig("acllog-max-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.acllog_max_len, 128, INTEGER_CONFIG, NULL, NULL),
|
||||
|
||||
/* Long Long configs */
|
||||
createLongLongConfig("script-time-limit", "lua-time-limit", MODIFIABLE_CONFIG, 0, LONG_MAX, server.script_time_limit, 5000, INTEGER_CONFIG, NULL, NULL),/* milliseconds */
|
||||
createLongLongConfig("busy-reply-threshold", "lua-time-limit", MODIFIABLE_CONFIG, 0, LONG_MAX, server.busy_reply_threshold, 5000, INTEGER_CONFIG, NULL, NULL),/* milliseconds */
|
||||
createLongLongConfig("cluster-node-timeout", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.cluster_node_timeout, 15000, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("slowlog-log-slower-than", NULL, MODIFIABLE_CONFIG, -1, LLONG_MAX, server.slowlog_log_slower_than, 10000, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
|
9
src/db.c
9
src/db.c
@ -1075,10 +1075,15 @@ void shutdownCommand(client *c) {
|
||||
if (!(flags & SHUTDOWN_NOSAVE) && scriptIsTimedout()) {
|
||||
/* Script timed out. Shutdown allowed only with the NOSAVE flag. See
|
||||
* also processCommand where these errors are returned. */
|
||||
if (scriptIsEval())
|
||||
if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
|
||||
addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
|
||||
} else if (server.busy_module_yield_flags) {
|
||||
addReplyErrorObject(c, shared.slowmoduleerr);
|
||||
} else if (scriptIsEval()) {
|
||||
addReplyErrorObject(c, shared.slowevalerr);
|
||||
else
|
||||
} else {
|
||||
addReplyErrorObject(c, shared.slowscripterr);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1535,8 +1535,8 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
|
||||
/* Check if a timeout occurred. */
|
||||
if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) {
|
||||
mstime_t elapsed = elapsedMs(rctx->start_time);
|
||||
mstime_t timelimit = server.script_time_limit ?
|
||||
server.script_time_limit : 5000;
|
||||
mstime_t timelimit = server.busy_reply_threshold ?
|
||||
server.busy_reply_threshold : 5000;
|
||||
if (elapsed >= timelimit) {
|
||||
timeout = 1;
|
||||
ldb.step = 1;
|
||||
|
90
src/module.c
90
src/module.c
@ -158,6 +158,7 @@ struct RedisModuleCtx {
|
||||
getKeysResult *keys_result;
|
||||
|
||||
struct RedisModulePoolAllocBlock *pa_head;
|
||||
long long next_yield_time;
|
||||
};
|
||||
typedef struct RedisModuleCtx RedisModuleCtx;
|
||||
|
||||
@ -650,8 +651,15 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
|
||||
if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE)) {
|
||||
/* Modules take care of their own propagation, when we are
|
||||
* outside of call() context (timers, events, etc.). */
|
||||
if (--server.module_ctx_nesting == 0 && !server.core_propagates)
|
||||
propagatePendingCommands();
|
||||
if (--server.module_ctx_nesting == 0) {
|
||||
if (!server.core_propagates)
|
||||
propagatePendingCommands();
|
||||
if (server.busy_module_yield_flags) {
|
||||
blockingOperationEnds();
|
||||
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
|
||||
unblockPostponedClients();
|
||||
}
|
||||
}
|
||||
}
|
||||
autoMemoryCollect(ctx);
|
||||
poolAllocRelease(ctx);
|
||||
@ -691,6 +699,18 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
|
||||
else if (ctx_flags & REDISMODULE_CTX_NEW_CLIENT)
|
||||
out_ctx->client = createClient(NULL);
|
||||
|
||||
/* Calculate the initial yield time for long blocked contexts.
|
||||
* in loading we depend on the server hz, but in other cases we also wait
|
||||
* for busy_reply_threshold.
|
||||
* Note that in theory we could have started processing BUSY_MODULE_YIELD_EVENTS
|
||||
* sooner, and only delay the processing for clients till the busy_reply_threshold,
|
||||
* but this carries some overheads of frequently marking clients with BLOCKED_POSTPONE
|
||||
* and releasing them, i.e. if modules only block for short periods. */
|
||||
if (server.loading)
|
||||
out_ctx->next_yield_time = getMonotonicUs() + 1000000 / server.hz;
|
||||
else
|
||||
out_ctx->next_yield_time = getMonotonicUs() + server.busy_reply_threshold * 1000;
|
||||
|
||||
if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) {
|
||||
server.module_ctx_nesting++;
|
||||
}
|
||||
@ -821,6 +841,7 @@ int64_t commandFlagsFromString(char *s) {
|
||||
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
|
||||
else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
|
||||
else if (!strcasecmp(t,"no-mandatory-keys")) flags |= CMD_NO_MANDATORY_KEYS;
|
||||
else if (!strcasecmp(t,"allow-busy")) flags |= CMD_ALLOW_BUSY;
|
||||
else break;
|
||||
}
|
||||
sdsfreesplitres(tokens,count);
|
||||
@ -917,6 +938,9 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, const c
|
||||
* * **"may-replicate"**: This command may generate replication traffic, even
|
||||
* though it's not a write command.
|
||||
* * **"no-mandatory-keys"**: All the keys this command may take are optional
|
||||
* * **"allow-busy"**: Permit the command while the server is blocked either by
|
||||
* a script or by a slow module command, see
|
||||
* RM_Yield.
|
||||
*
|
||||
* The last three parameters specify which arguments of the new command are
|
||||
* Redis keys. See https://redis.io/commands/command for more information.
|
||||
@ -1361,6 +1385,61 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* This API allows modules to let Redis process background tasks, and some
|
||||
* commands during long blocking execution of a module command.
|
||||
* The module can call this API periodically.
|
||||
* The flags is a bit mask of these:
|
||||
*
|
||||
* - `REDISMODULE_YIELD_FLAG_NONE`: No special flags, can perform some background
|
||||
* operations, but not process client commands.
|
||||
* - `REDISMODULE_YIELD_FLAG_CLIENTS`: Redis can also process client commands.
|
||||
*
|
||||
* The `busy_reply` argument is optional, and can be used to control the verbose
|
||||
* error string after the `-BUSY` error code.
|
||||
*
|
||||
* When the `REDISMODULE_YIELD_FLAG_CLIENTS` is used, Redis will only start
|
||||
* processing client commands after the time defined by the
|
||||
* `busy-reply-threshold` config, in which case Redis will start rejecting most
|
||||
* commands with `-BUSY` error, but allow the ones marked with the `allow-busy`
|
||||
* flag to be executed.
|
||||
* This API can also be used in thread safe context (while locked), and during
|
||||
* loading (in the `rdb_load` callback, in which case it'll reject commands with
|
||||
* the -LOADING error)
|
||||
*/
|
||||
void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
|
||||
long long now = getMonotonicUs();
|
||||
if (now >= ctx->next_yield_time) {
|
||||
/* In loading mode, there's no need to handle busy_module_yield_reply,
|
||||
* and busy_module_yield_flags, since redis is anyway rejecting all
|
||||
* commands with -LOADING. */
|
||||
if (server.loading) {
|
||||
/* Let redis process events */
|
||||
processEventsWhileBlocked();
|
||||
} else {
|
||||
const char *prev_busy_module_yield_reply = server.busy_module_yield_reply;
|
||||
server.busy_module_yield_reply = busy_reply;
|
||||
/* start the blocking operation if not already started. */
|
||||
if (!server.busy_module_yield_flags) {
|
||||
server.busy_module_yield_flags = flags & REDISMODULE_YIELD_FLAG_CLIENTS ?
|
||||
BUSY_MODULE_YIELD_CLIENTS : BUSY_MODULE_YIELD_EVENTS;
|
||||
blockingOperationStarts();
|
||||
}
|
||||
|
||||
/* Let redis process events */
|
||||
processEventsWhileBlocked();
|
||||
|
||||
server.busy_module_yield_reply = prev_busy_module_yield_reply;
|
||||
/* Possibly restore the previous flags in case of two nested contexts
|
||||
* that use this API with different flags, but keep the first bit
|
||||
* (PROCESS_EVENTS) set, so we know to call blockingOperationEnds on time. */
|
||||
server.busy_module_yield_flags &= ~BUSY_MODULE_YIELD_CLIENTS;
|
||||
}
|
||||
|
||||
/* decide when the next event should fire. */
|
||||
ctx->next_yield_time = now + 1000000 / server.hz;
|
||||
}
|
||||
}
|
||||
|
||||
/* Set flags defining capabilities or behavior bit flags.
|
||||
*
|
||||
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
|
||||
@ -6743,6 +6822,12 @@ void moduleGILBeforeUnlock() {
|
||||
* released we have to propagate here). */
|
||||
server.module_ctx_nesting--;
|
||||
propagatePendingCommands();
|
||||
|
||||
if (server.busy_module_yield_flags) {
|
||||
blockingOperationEnds();
|
||||
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
|
||||
unblockPostponedClients();
|
||||
}
|
||||
}
|
||||
|
||||
/* Release the server lock after a thread safe API call was executed. */
|
||||
@ -10958,4 +11043,5 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(EventLoopAdd);
|
||||
REGISTER_API(EventLoopDel);
|
||||
REGISTER_API(EventLoopAddOneShot);
|
||||
REGISTER_API(Yield);
|
||||
}
|
||||
|
@ -194,7 +194,7 @@ client *createClient(connection *conn) {
|
||||
c->peerid = NULL;
|
||||
c->sockname = NULL;
|
||||
c->client_list_node = NULL;
|
||||
c->paused_list_node = NULL;
|
||||
c->postponed_list_node = NULL;
|
||||
c->pending_read_list_node = NULL;
|
||||
c->client_tracking_redirection = 0;
|
||||
c->client_tracking_prefixes = NULL;
|
||||
@ -3628,13 +3628,19 @@ static void updateClientPauseTypeAndEndTime(void) {
|
||||
/* If the pause type is less restrictive than before, we unblock all clients
|
||||
* so they are reprocessed (may get re-paused). */
|
||||
if (type < old_type) {
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
listRewind(server.paused_clients, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = listNodeValue(ln);
|
||||
unblockClient(c);
|
||||
}
|
||||
unblockPostponedClients();
|
||||
}
|
||||
}
|
||||
|
||||
/* Unblock all paused clients (ones that where blocked by BLOCKED_POSTPONE (possibly in processCommand).
|
||||
* This means they'll get re-processed in beforeSleep, and may get paused again if needed. */
|
||||
void unblockPostponedClients() {
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
listRewind(server.postponed_clients, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = listNodeValue(ln);
|
||||
unblockClient(c);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -218,6 +218,10 @@ This flag should not be used directly by the module.
|
||||
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
|
||||
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
|
||||
|
||||
/* RM_Yield flags */
|
||||
#define REDISMODULE_YIELD_FLAG_NONE (1<<0)
|
||||
#define REDISMODULE_YIELD_FLAG_CLIENTS (1<<1)
|
||||
|
||||
/* This type represents a timer handle, and is returned when a timer is
|
||||
* registered and used in order to invalidate a timer. It's just a 64 bit
|
||||
* number, because this is how each timer is represented inside the radix tree
|
||||
@ -919,7 +923,7 @@ REDISMODULE_API int (*RedisModule_SetCommandKeySpecBeginSearchIndex)(RedisModule
|
||||
REDISMODULE_API int (*RedisModule_SetCommandKeySpecBeginSearchKeyword)(RedisModuleCommand *command, int spec_id, const char *keyword, int startfrom) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_SetCommandKeySpecFindKeysRange)(RedisModuleCommand *command, int spec_id, int lastkey, int keystep, int limit) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_SetCommandKeySpecFindKeysKeynum)(RedisModuleCommand *command, int spec_id, int keynumidx, int firstkey, int keystep) REDISMODULE_ATTR;
|
||||
|
||||
REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR;
|
||||
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||
@ -1242,6 +1246,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(SetCommandKeySpecBeginSearchKeyword);
|
||||
REDISMODULE_GET_API(SetCommandKeySpecFindKeysRange);
|
||||
REDISMODULE_GET_API(SetCommandKeySpecFindKeysKeynum);
|
||||
REDISMODULE_GET_API(Yield);
|
||||
REDISMODULE_GET_API(GetThreadSafeContext);
|
||||
REDISMODULE_GET_API(GetDetachedThreadSafeContext);
|
||||
REDISMODULE_GET_API(FreeThreadSafeContext);
|
||||
|
@ -85,7 +85,7 @@ int scriptInterrupt(scriptRunCtx *run_ctx) {
|
||||
}
|
||||
|
||||
long long elapsed = elapsedMs(run_ctx->start_time);
|
||||
if (elapsed < server.script_time_limit) {
|
||||
if (elapsed < server.busy_reply_threshold) {
|
||||
return SCRIPT_CONTINUE;
|
||||
}
|
||||
|
||||
|
@ -1363,7 +1363,7 @@ void luaCallFunction(scriptRunCtx* run_ctx, lua_State *lua, robj** keys, size_t
|
||||
* each time the Lua hook is invoked. */
|
||||
luaSaveOnRegistry(lua, REGISTRY_RUN_CTX_NAME, run_ctx);
|
||||
|
||||
if (server.script_time_limit > 0 && !debug_enabled) {
|
||||
if (server.busy_reply_threshold > 0 && !debug_enabled) {
|
||||
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
|
||||
delhook = 1;
|
||||
} else if (debug_enabled) {
|
||||
|
46
src/server.c
46
src/server.c
@ -1613,6 +1613,8 @@ void createSharedObjects(void) {
|
||||
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
|
||||
shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
|
||||
"-BUSY Redis is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n"));
|
||||
shared.slowmoduleerr = createObject(OBJ_STRING,sdsnew(
|
||||
"-BUSY Redis is busy running a module command.\r\n"));
|
||||
shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
|
||||
"-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
|
||||
shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
|
||||
@ -2324,7 +2326,7 @@ void initServer(void) {
|
||||
server.client_pause_end_time = 0;
|
||||
memset(server.client_pause_per_purpose, 0,
|
||||
sizeof(server.client_pause_per_purpose));
|
||||
server.paused_clients = listCreate();
|
||||
server.postponed_clients = listCreate();
|
||||
server.events_processed_while_blocked = 0;
|
||||
server.system_memory_size = zmalloc_get_memory_size();
|
||||
server.blocked_last_cron = 0;
|
||||
@ -2411,6 +2413,8 @@ void initServer(void) {
|
||||
server.cronloops = 0;
|
||||
server.in_script = 0;
|
||||
server.in_exec = 0;
|
||||
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
|
||||
server.busy_module_yield_reply = NULL;
|
||||
server.core_propagates = 0;
|
||||
server.propagate_no_multi = 0;
|
||||
server.module_ctx_nesting = 0;
|
||||
@ -3367,6 +3371,16 @@ int processCommand(client *c) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* If we're inside a module blocked context yielding that wants to avoid
|
||||
* processing clients, postpone the command. */
|
||||
if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE &&
|
||||
!(server.busy_module_yield_flags & BUSY_MODULE_YIELD_CLIENTS))
|
||||
{
|
||||
c->bpop.timeout = 0;
|
||||
blockClient(c,BLOCKED_POSTPONE);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Now lookup the command and check ASAP about trivial error conditions
|
||||
* such as wrong arity, bad command name and so forth. */
|
||||
c->cmd = c->lastcmd = lookupCommand(c->argv,c->argc);
|
||||
@ -3645,30 +3659,19 @@ int processCommand(client *c) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Lua script too slow? Only allow a limited number of commands.
|
||||
/* when a busy job is being done (script / module)
|
||||
* Only allow a limited number of commands.
|
||||
* Note that we need to allow the transactions commands, otherwise clients
|
||||
* sending a transaction with pipelining without error checking, may have
|
||||
* the MULTI plus a few initial commands refused, then the timeout
|
||||
* condition resolves, and the bottom-half of the transaction gets
|
||||
* executed, see Github PR #7022. */
|
||||
if (scriptIsTimedout() &&
|
||||
c->cmd->proc != authCommand &&
|
||||
c->cmd->proc != helloCommand &&
|
||||
c->cmd->proc != replconfCommand &&
|
||||
c->cmd->proc != multiCommand &&
|
||||
c->cmd->proc != discardCommand &&
|
||||
c->cmd->proc != watchCommand &&
|
||||
c->cmd->proc != unwatchCommand &&
|
||||
c->cmd->proc != quitCommand &&
|
||||
c->cmd->proc != resetCommand &&
|
||||
c->cmd->proc != shutdownCommand && /* more checks in shutdownCommand */
|
||||
!(c->cmd->proc == scriptCommand &&
|
||||
c->argc == 2 &&
|
||||
tolower(((char*)c->argv[1]->ptr)[0]) == 'k') &&
|
||||
!(c->cmd->proc == functionKillCommand) &&
|
||||
!(c->cmd->proc == functionStatsCommand))
|
||||
{
|
||||
if (scriptIsEval()) {
|
||||
if ((scriptIsTimedout() || server.busy_module_yield_flags) && !(c->cmd->flags & CMD_ALLOW_BUSY)) {
|
||||
if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
|
||||
rejectCommandFormat(c, "-BUSY %s", server.busy_module_yield_reply);
|
||||
} else if (server.busy_module_yield_flags) {
|
||||
rejectCommand(c, shared.slowmoduleerr);
|
||||
} else if (scriptIsEval()) {
|
||||
rejectCommand(c, shared.slowevalerr);
|
||||
} else {
|
||||
rejectCommand(c, shared.slowscripterr);
|
||||
@ -3691,7 +3694,7 @@ int processCommand(client *c) {
|
||||
(server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
|
||||
{
|
||||
c->bpop.timeout = 0;
|
||||
blockClient(c,BLOCKED_PAUSE);
|
||||
blockClient(c,BLOCKED_POSTPONE);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -4111,6 +4114,7 @@ void addReplyFlagsForCommand(client *c, struct redisCommand *cmd) {
|
||||
{CMD_NO_ASYNC_LOADING, "no_async_loading"},
|
||||
{CMD_NO_MULTI, "no_multi"},
|
||||
{CMD_MOVABLE_KEYS, "movablekeys"},
|
||||
{CMD_ALLOW_BUSY, "allow_busy"},
|
||||
{0,NULL}
|
||||
};
|
||||
/* "sentinel" and "only-sentinel" are hidden on purpose. */
|
||||
|
20
src/server.h
20
src/server.h
@ -209,6 +209,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
#define CMD_NO_ASYNC_LOADING (1ULL<<23)
|
||||
#define CMD_NO_MULTI (1ULL<<24)
|
||||
#define CMD_MOVABLE_KEYS (1ULL<<25) /* populated by populateCommandMovableKeys */
|
||||
#define CMD_ALLOW_BUSY ((1ULL<<26))
|
||||
|
||||
/* Command flags that describe ACLs categories. */
|
||||
#define ACL_CATEGORY_KEYSPACE (1ULL<<0)
|
||||
@ -349,7 +350,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
|
||||
#define BLOCKED_STREAM 4 /* XREAD. */
|
||||
#define BLOCKED_ZSET 5 /* BZPOP et al. */
|
||||
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
|
||||
#define BLOCKED_POSTPONE 6 /* Blocked by processCommand, re-try processing later. */
|
||||
#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */
|
||||
#define BLOCKED_NUM 8 /* Number of blocked states. */
|
||||
|
||||
@ -603,6 +604,11 @@ typedef enum {
|
||||
* Value quantization within the range will thus be no larger than 1/100th (or 1%) of any value.
|
||||
* The total size per histogram should sit around 40 KiB Bytes. */
|
||||
|
||||
/* Busy module flags, see busy_module_yield_flags */
|
||||
#define BUSY_MODULE_YIELD_NONE (0)
|
||||
#define BUSY_MODULE_YIELD_EVENTS (1<<0)
|
||||
#define BUSY_MODULE_YIELD_CLIENTS (1<<1)
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Data types
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -1136,7 +1142,7 @@ typedef struct client {
|
||||
sds peerid; /* Cached peer ID. */
|
||||
sds sockname; /* Cached connection target address. */
|
||||
listNode *client_list_node; /* list node in client list */
|
||||
listNode *paused_list_node; /* list node within the pause list */
|
||||
listNode *postponed_list_node; /* list node within the postponed list */
|
||||
listNode *pending_read_list_node; /* list node in clients pending read list */
|
||||
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
|
||||
* when the authenticated user
|
||||
@ -1211,7 +1217,8 @@ struct sharedObjectsStruct {
|
||||
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
|
||||
*queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4],
|
||||
*emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
||||
*outofrangeerr, *noscripterr, *loadingerr, *slowevalerr, *slowscripterr, *bgsaveerr,
|
||||
*outofrangeerr, *noscripterr, *loadingerr,
|
||||
*slowevalerr, *slowscripterr, *slowmoduleerr, *bgsaveerr,
|
||||
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
|
||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
||||
@ -1459,6 +1466,8 @@ struct redisServer {
|
||||
int always_show_logo; /* Show logo even for non-stdout logging. */
|
||||
int in_script; /* Are we inside EVAL? */
|
||||
int in_exec; /* Are we inside EXEC? */
|
||||
int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */
|
||||
const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
|
||||
int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */
|
||||
int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */
|
||||
int module_ctx_nesting; /* moduleCreateContext() nesting level */
|
||||
@ -1502,7 +1511,7 @@ struct redisServer {
|
||||
int in_nested_call; /* If > 0, in a nested call of a call */
|
||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||
pause_type client_pause_type; /* True if clients are currently paused */
|
||||
list *paused_clients; /* List of pause clients */
|
||||
list *postponed_clients; /* List of postponed clients */
|
||||
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
|
||||
pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES];
|
||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||
@ -1856,7 +1865,7 @@ struct redisServer {
|
||||
* dropping packets of a specific type */
|
||||
/* Scripting */
|
||||
client *script_caller; /* The client running script right now, or NULL */
|
||||
mstime_t script_time_limit; /* Script timeout in milliseconds */
|
||||
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
|
||||
int script_oom; /* OOM detected when script start */
|
||||
int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */
|
||||
/* Lazy free */
|
||||
@ -2461,6 +2470,7 @@ void pauseClients(pause_purpose purpose, mstime_t end, pause_type type);
|
||||
void unpauseClients(pause_purpose purpose);
|
||||
int areClientsPaused(void);
|
||||
int checkClientPauseTimeoutAndReturnIfPaused(void);
|
||||
void unblockPostponedClients();
|
||||
void processEventsWhileBlocked(void);
|
||||
void whileBlockedCron();
|
||||
void blockingOperationStarts();
|
||||
|
@ -1,3 +1,8 @@
|
||||
/* define macros for having usleep */
|
||||
#define _BSD_SOURCE
|
||||
#define _DEFAULT_SOURCE
|
||||
#include <unistd.h>
|
||||
|
||||
#include "redismodule.h"
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
@ -5,6 +10,10 @@
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
|
||||
/* used to test processing events during slow bg operation */
|
||||
static volatile int g_slow_bg_operation = 0;
|
||||
static volatile int g_is_in_slow_bg_operation = 0;
|
||||
|
||||
void *sub_worker(void *arg) {
|
||||
// Get Redis module context
|
||||
RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
|
||||
@ -99,6 +108,16 @@ void *bg_call_worker(void *arg) {
|
||||
// Acquire GIL
|
||||
RedisModule_ThreadSafeContextLock(ctx);
|
||||
|
||||
// Test slow operation yielding
|
||||
if (g_slow_bg_operation) {
|
||||
g_is_in_slow_bg_operation = 1;
|
||||
while (g_slow_bg_operation) {
|
||||
RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
|
||||
usleep(1000);
|
||||
}
|
||||
g_is_in_slow_bg_operation = 0;
|
||||
}
|
||||
|
||||
// Call the command
|
||||
const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
|
||||
@ -203,6 +222,73 @@ int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
||||
/* this flag is used to work with busy commands, that might take a while
|
||||
* and ability to stop the busy work with a different command*/
|
||||
static volatile int abort_flag = 0;
|
||||
|
||||
int slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
long long block_time = 0;
|
||||
if (RedisModule_StringToLongLong(argv[1], &block_time) != REDISMODULE_OK) {
|
||||
RedisModule_ReplyWithError(ctx, "Invalid integer value");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
uint64_t start_time = RedisModule_MonotonicMicroseconds();
|
||||
/* when not blocking indefinitely, we don't process client commands in this test. */
|
||||
int yield_flags = block_time? REDISMODULE_YIELD_FLAG_NONE: REDISMODULE_YIELD_FLAG_CLIENTS;
|
||||
while (!abort_flag) {
|
||||
RedisModule_Yield(ctx, yield_flags, "Slow module operation");
|
||||
usleep(1000);
|
||||
if (block_time && RedisModule_MonotonicMicroseconds() - start_time > (uint64_t)block_time)
|
||||
break;
|
||||
}
|
||||
|
||||
abort_flag = 0;
|
||||
RedisModule_ReplyWithLongLong(ctx, 1);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int stop_slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
abort_flag = 1;
|
||||
RedisModule_ReplyWithLongLong(ctx, 1);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* used to enable or disable slow operation in do_bg_rm_call */
|
||||
static int set_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
long long ll;
|
||||
if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
|
||||
RedisModule_ReplyWithError(ctx, "Invalid integer value");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
g_slow_bg_operation = ll;
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* used to test if we reached the slow operation in do_bg_rm_call */
|
||||
static int is_in_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
UNUSED(argv);
|
||||
if (argc != 1) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
RedisModule_ReplyWithLongLong(ctx, g_is_in_slow_bg_operation);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -223,5 +309,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "slow_fg_command", slow_fg_command,"", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "stop_slow_fg_command", stop_slow_fg_command,"allow-busy", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "set_slow_bg_operation", set_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -2,11 +2,20 @@
|
||||
* for general ModuleDataType coverage.
|
||||
*/
|
||||
|
||||
/* define macros for having usleep */
|
||||
#define _BSD_SOURCE
|
||||
#define _DEFAULT_SOURCE
|
||||
#include <unistd.h>
|
||||
|
||||
#include "redismodule.h"
|
||||
|
||||
static RedisModuleType *datatype = NULL;
|
||||
static int load_encver = 0;
|
||||
|
||||
/* used to test processing events during slow loading */
|
||||
static volatile int slow_loading = 0;
|
||||
static volatile int is_in_slow_loading = 0;
|
||||
|
||||
#define DATATYPE_ENC_VER 1
|
||||
|
||||
typedef struct {
|
||||
@ -25,6 +34,17 @@ static void *datatype_load(RedisModuleIO *io, int encver) {
|
||||
DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType));
|
||||
dt->intval = intval;
|
||||
dt->strval = strval;
|
||||
|
||||
if (slow_loading) {
|
||||
RedisModuleCtx *ctx = RedisModule_GetContextFromIO(io);
|
||||
is_in_slow_loading = 1;
|
||||
while (slow_loading) {
|
||||
RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
|
||||
usleep(1000);
|
||||
}
|
||||
is_in_slow_loading = 0;
|
||||
}
|
||||
|
||||
return dt;
|
||||
}
|
||||
|
||||
@ -185,6 +205,35 @@ static int datatype_swap(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* used to enable or disable slow loading */
|
||||
static int datatype_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
long long ll;
|
||||
if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
|
||||
RedisModule_ReplyWithError(ctx, "Invalid integer value");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
slow_loading = ll;
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* used to test if we reached the slow loading code */
|
||||
static int datatype_is_in_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
if (argc != 1) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
RedisModule_ReplyWithLongLong(ctx, is_in_slow_loading);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -224,5 +273,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
"write", 1, 1, 1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "datatype.slow_loading", datatype_slow_loading,
|
||||
"allow-loading", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "datatype.is_in_slow_loading", datatype_is_in_slow_loading,
|
||||
"allow-loading", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -239,7 +239,7 @@ start_server {tags {"scripting"}} {
|
||||
|
||||
test {FUNCTION - test function kill} {
|
||||
set rd [redis_deferring_client]
|
||||
r config set script-time-limit 10
|
||||
r config set busy-reply-threshold 10
|
||||
r function load lua test REPLACE [get_function_code test {local a = 1 while true do a = a + 1 end}]
|
||||
$rd fcall test 0
|
||||
after 200
|
||||
@ -253,7 +253,7 @@ start_server {tags {"scripting"}} {
|
||||
|
||||
test {FUNCTION - test script kill not working on function} {
|
||||
set rd [redis_deferring_client]
|
||||
r config set script-time-limit 10
|
||||
r config set busy-reply-threshold 10
|
||||
r function load lua test REPLACE [get_function_code test {local a = 1 while true do a = a + 1 end}]
|
||||
$rd fcall test 0
|
||||
after 200
|
||||
@ -268,7 +268,7 @@ start_server {tags {"scripting"}} {
|
||||
|
||||
test {FUNCTION - test function kill not working on eval} {
|
||||
set rd [redis_deferring_client]
|
||||
r config set script-time-limit 10
|
||||
r config set busy-reply-threshold 10
|
||||
$rd eval {local a = 1 while true do a = a + 1 end} 0
|
||||
after 200
|
||||
catch {r ping} e
|
||||
|
@ -92,6 +92,83 @@ start_server {tags {"modules"}} {
|
||||
}
|
||||
}
|
||||
|
||||
test {Busy module command} {
|
||||
set busy_time_limit 50
|
||||
set old_time_limit [lindex [r config get busy-reply-threshold] 1]
|
||||
r config set busy-reply-threshold $busy_time_limit
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
# run command that blocks until released
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd slow_fg_command 0
|
||||
$rd flush
|
||||
|
||||
# make sure we get BUSY error, and that we didn't get it too early
|
||||
assert_error {*BUSY Slow module operation*} {r ping}
|
||||
assert_morethan_equal [expr [clock clicks -milliseconds]-$start] $busy_time_limit
|
||||
|
||||
# abort the blocking operation
|
||||
r stop_slow_fg_command
|
||||
wait_for_condition 50 100 {
|
||||
[catch {r ping} e] == 0
|
||||
} else {
|
||||
fail "Failed waiting for busy command to end"
|
||||
}
|
||||
$rd read
|
||||
|
||||
#run command that blocks for 200ms
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd slow_fg_command 200000
|
||||
$rd flush
|
||||
after 10 ;# try to make sure redis started running the command before we proceed
|
||||
|
||||
# make sure we didn't get BUSY error, it simply blocked till the command was done
|
||||
r ping
|
||||
assert_morethan_equal [expr [clock clicks -milliseconds]-$start] 200
|
||||
$rd read
|
||||
|
||||
$rd close
|
||||
r config set busy-reply-threshold $old_time_limit
|
||||
}
|
||||
|
||||
test {RM_Call from blocked client} {
|
||||
set busy_time_limit 50
|
||||
set old_time_limit [lindex [r config get busy-reply-threshold] 1]
|
||||
r config set busy-reply-threshold $busy_time_limit
|
||||
|
||||
# trigger slow operation
|
||||
r set_slow_bg_operation 1
|
||||
r hset hash foo bar
|
||||
set rd [redis_deferring_client]
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd do_bg_rm_call hgetall hash
|
||||
|
||||
# wait till we know we're blocked inside the module
|
||||
wait_for_condition 50 100 {
|
||||
[r is_in_slow_bg_operation] eq 1
|
||||
} else {
|
||||
fail "Failed waiting for slow operation to start"
|
||||
}
|
||||
|
||||
# make sure we get BUSY error, and that we didn't get here too early
|
||||
assert_error {*BUSY Slow module operation*} {r ping}
|
||||
assert_morethan [expr [clock clicks -milliseconds]-$start] $busy_time_limit
|
||||
# abort the blocking operation
|
||||
r set_slow_bg_operation 0
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[r is_in_slow_bg_operation] eq 0
|
||||
} else {
|
||||
fail "Failed waiting for slow operation to stop"
|
||||
}
|
||||
assert_equal [r ping] {PONG}
|
||||
|
||||
r config set busy-reply-threshold $old_time_limit
|
||||
set res [$rd read]
|
||||
$rd close
|
||||
set _ $res
|
||||
} {foo bar}
|
||||
|
||||
test {blocked client reaches client output buffer limit} {
|
||||
r hset hash big [string repeat x 50000]
|
||||
r hset hash bada [string repeat x 50000]
|
||||
|
@ -55,4 +55,34 @@ start_server {tags {"modules"}} {
|
||||
r copy sourcekey targetkey
|
||||
r datatype.get targetkey
|
||||
} {1234 AAA/sourcekey/targetkey}
|
||||
|
||||
test {DataType: Slow Loading} {
|
||||
r config set busy-reply-threshold 5000 ;# make sure we're using a high default
|
||||
# trigger slow loading
|
||||
r datatype.slow_loading 1
|
||||
set rd [redis_deferring_client]
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd debug reload
|
||||
|
||||
# wait till we know we're blocked inside the module
|
||||
wait_for_condition 50 100 {
|
||||
[r datatype.is_in_slow_loading] eq 1
|
||||
} else {
|
||||
fail "Failed waiting for slow loading to start"
|
||||
}
|
||||
|
||||
# make sure we get LOADING error, and that we didn't get here late (not waiting for busy-reply-threshold)
|
||||
assert_error {*LOADING*} {r ping}
|
||||
assert_lessthan [expr [clock clicks -milliseconds]-$start] 2000
|
||||
|
||||
# abort the blocking operation
|
||||
r datatype.slow_loading 0
|
||||
wait_for_condition 50 100 {
|
||||
[s loading] eq {0}
|
||||
} else {
|
||||
fail "Failed waiting for loading to end"
|
||||
}
|
||||
$rd read
|
||||
$rd close
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user