Redis Functions - Added redis function unit and Lua engine

Redis function unit is located inside functions.c
and contains Redis Function implementation:
1. FUNCTION commands:
  * FUNCTION CREATE
  * FCALL
  * FCALL_RO
  * FUNCTION DELETE
  * FUNCTION KILL
  * FUNCTION INFO
2. Register engine

In addition, this commit introduce the first engine
that uses the Redis Function capabilities, the
Lua engine.
This commit is contained in:
meir@redislabs.com 2021-10-07 14:41:26 +03:00 committed by meir
parent f21dc38a6e
commit cbd463175f
21 changed files with 1647 additions and 143 deletions

View File

@ -443,6 +443,16 @@ This file also implements both the `SYNC` and `PSYNC` commands that are
used in order to perform the first synchronization between masters and
replicas, or to continue the replication after a disconnection.
Script
---
The script unit is compose of 3 units
* `script.c` - integration of scripts with Redis (commands execution, set replication/resp, ..)
* `script_lua.c` - responsible to execute Lua code, uses script.c to interact with Redis from within the Lua code.
* `function_lua.c` - contains the Lua engine implementation, uses script_lua.c to execute the Lua code.
* `functions.c` - Contains Redis Functions implementation (FUNCTION command), uses functions_lua.c if the function it wants to invoke needs the Lua engine.
* `eval.c` - Contains the `eval` implementation using `script_lua.c` to invoke the Lua code.
Other C files
---
@ -451,7 +461,6 @@ Other C files
* `sds.c` is the Redis string library, check https://github.com/antirez/sds for more information.
* `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel.
* `dict.c` is an implementation of a non-blocking hash table which rehashes incrementally.
* `scripting.c` implements Lua scripting. It is completely self-contained and isolated from the rest of the Redis implementation and is simple enough to understand if you are familiar with the Lua API.
* `cluster.c` implements the Redis Cluster. Probably a good read only after being very familiar with the rest of the Redis code base. If you want to read `cluster.c` make sure to read the [Redis Cluster specification][4].
[4]: https://redis.io/topics/cluster-spec

View File

@ -309,7 +309,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View File

@ -30,6 +30,7 @@
#include "server.h"
#include "bio.h"
#include "rio.h"
#include "functions.h"
#include <signal.h>
#include <fcntl.h>
@ -754,7 +755,8 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL,server.db) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {

View File

@ -1744,6 +1744,11 @@ int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);

View File

@ -257,7 +257,7 @@ void scriptingInit(int setup) {
/* Lua beginners often don't use "local", this is likely to introduce
* subtle bugs in their code. To prevent problems we protect accesses
* to global variables. */
luaEnableGlobalsProtection(lua);
luaEnableGlobalsProtection(lua, 1);
lctx.lua = lua;
}
@ -443,6 +443,8 @@ void evalGenericCommand(client *c, int evalsha) {
scriptRunCtx rctx;
scriptPrepareForRun(&rctx, lctx.lua_client, c, lctx.lua_cur_script);
rctx.flags |= SCRIPT_EVAL_MODE; /* mark the current run as legacy so we
will get legacy error messages and logs */
if (!lctx.lua_replicate_commands) rctx.flags |= SCRIPT_EVAL_REPLICATION;
/* This check is for EVAL_RO, EVALSHA_RO. We want to allow only read only commands */
if ((server.script_caller->cmd->proc == evalRoCommand ||
@ -584,7 +586,7 @@ NULL
addReplyBulkCBuffer(c,sha,40);
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
scriptKill(c);
scriptKill(c, 1);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"debug")) {
if (clientHasPendingReplies(c)) {
addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline");
@ -610,7 +612,7 @@ NULL
}
unsigned long evalMemory() {
return lua_gc(lctx.lua, LUA_GCCOUNT, 0) * 1024LL;
return luaMemory(lctx.lua);
}
dict* evalScriptsDict() {

183
src/function_lua.c Normal file
View File

@ -0,0 +1,183 @@
/*
* Copyright (c) 2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* function_lua.c unit provides the Lua engine functionality.
* Including registering the engine and implementing the engine
* callbacks:
* * Create a function from blob (usually text)
* * Invoke a function
* * Free function memory
* * Get memory usage
*
* Uses script_lua.c to run the Lua code.
*/
#include "functions.h"
#include "script_lua.h"
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
#define LUA_ENGINE_NAME "LUA"
#define REGISTRY_ENGINE_CTX_NAME "__ENGINE_CTX__"
#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__"
/* Lua engine ctx */
typedef struct luaEngineCtx {
lua_State *lua;
} luaEngineCtx;
/* Lua function ctx */
typedef struct luaFunctionCtx {
/* Special ID that allows getting the Lua function object from the Lua registry */
int lua_function_ref;
} luaFunctionCtx;
/*
* Compile a given blob and save it on the registry.
* Return a function ctx with Lua ref that allows to later retrieve the
* function from the registry.
*
* Return NULL on compilation error and set the error to the err variable
*/
static void* luaEngineCreate(void *engine_ctx, sds blob, sds *err) {
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
if (luaL_loadbuffer(lua, blob, sdslen(blob), "@user_function")) {
*err = sdsempty();
*err = sdscatprintf(*err, "Error compiling function: %s",
lua_tostring(lua, -1));
lua_pop(lua, 1);
return NULL;
}
serverAssert(lua_isfunction(lua, -1));
int lua_function_ref = luaL_ref(lua, LUA_REGISTRYINDEX);
luaFunctionCtx *f_ctx = zmalloc(sizeof(*f_ctx));
*f_ctx = (luaFunctionCtx ) { .lua_function_ref = lua_function_ref, };
return f_ctx;
}
/*
* Invole the give function with the given keys and args
*/
static void luaEngineCall(scriptRunCtx *run_ctx,
void *engine_ctx,
void *compiled_function,
robj **keys,
size_t nkeys,
robj **args,
size_t nargs)
{
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
luaFunctionCtx *f_ctx = compiled_function;
/* Push error handler */
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
lua_gettable(lua, LUA_REGISTRYINDEX);
lua_rawgeti(lua, LUA_REGISTRYINDEX, f_ctx->lua_function_ref);
serverAssert(lua_isfunction(lua, -1));
luaCallFunction(run_ctx, lua, keys, nkeys, args, nargs, 0);
lua_pop(lua, 1); /* Pop error handler */
}
static size_t luaEngineGetUsedMemoy(void *engine_ctx) {
luaEngineCtx *lua_engine_ctx = engine_ctx;
return luaMemory(lua_engine_ctx->lua);
}
static size_t luaEngineFunctionMemoryOverhead(void *compiled_function) {
return zmalloc_size(compiled_function);
}
static size_t luaEngineMemoryOverhead(void *engine_ctx) {
luaEngineCtx *lua_engine_ctx = engine_ctx;
return zmalloc_size(lua_engine_ctx);
}
static void luaEngineFreeFunction(void *engine_ctx, void *compiled_function) {
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
luaFunctionCtx *f_ctx = compiled_function;
lua_unref(lua, f_ctx->lua_function_ref);
zfree(f_ctx);
}
/* Initialize Lua engine, should be called once on start. */
int luaEngineInitEngine() {
luaEngineCtx *lua_engine_ctx = zmalloc(sizeof(*lua_engine_ctx));
lua_engine_ctx->lua = lua_open();
luaRegisterRedisAPI(lua_engine_ctx->lua);
/* Save error handler to registry */
lua_pushstring(lua_engine_ctx->lua, REGISTRY_ERROR_HANDLER_NAME);
char *errh_func = "local dbg = debug\n"
"local error_handler = function (err)\n"
" local i = dbg.getinfo(2,'nSl')\n"
" if i and i.what == 'C' then\n"
" i = dbg.getinfo(3,'nSl')\n"
" end\n"
" if i then\n"
" return i.source .. ':' .. i.currentline .. ': ' .. err\n"
" else\n"
" return err\n"
" end\n"
"end\n"
"return error_handler";
luaL_loadbuffer(lua_engine_ctx->lua, errh_func, strlen(errh_func), "@err_handler_def");
lua_pcall(lua_engine_ctx->lua,0,1,0);
lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX);
/* save the engine_ctx on the registry so we can get it from the Lua interpreter */
luaSaveOnRegistry(lua_engine_ctx->lua, REGISTRY_ENGINE_CTX_NAME, lua_engine_ctx);
luaEnableGlobalsProtection(lua_engine_ctx->lua, 0);
engine *lua_engine = zmalloc(sizeof(*lua_engine));
*lua_engine = (engine) {
.engine_ctx = lua_engine_ctx,
.create = luaEngineCreate,
.call = luaEngineCall,
.get_used_memory = luaEngineGetUsedMemoy,
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
.get_engine_memory_overhead = luaEngineMemoryOverhead,
.free_function = luaEngineFreeFunction,
};
return functionsRegisterEngine(LUA_ENGINE_NAME, lua_engine);
}

538
src/functions.c Normal file
View File

@ -0,0 +1,538 @@
/*
* Copyright (c) 2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "functions.h"
#include "sds.h"
#include "dict.h"
#include "adlist.h"
#include "atomicvar.h"
static size_t engine_cache_memory = 0;
/* Forward declaration */
static void engineFunctionDispose(dict *d, void *obj);
struct functionsCtx {
dict *functions; /* Function name -> Function object that can be used to run the function */
size_t cache_memory /* Overhead memory (structs, dictionaries, ..) used by all the functions */;
};
dictType engineDictType = {
dictSdsCaseHash, /* hash function */
dictSdsDup, /* key dup */
NULL, /* val dup */
dictSdsKeyCaseCompare, /* key compare */
dictSdsDestructor, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};
dictType functionDictType = {
dictSdsHash, /* hash function */
dictSdsDup, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
engineFunctionDispose,/* val destructor */
NULL /* allow to expand */
};
/* Dictionary of engines */
static dict *engines = NULL;
/* Functions Ctx.
* Contains the dictionary that map a function name to
* function object and the cache memory used by all the functions */
static functionsCtx *functions_ctx = NULL;
static size_t functionMallocSize(functionInfo *fi) {
return zmalloc_size(fi) + sdsZmallocSize(fi->name)
+ (fi->desc ? sdsZmallocSize(fi->desc) : 0)
+ sdsZmallocSize(fi->code)
+ fi->ei->engine->get_function_memory_overhead(fi->function);
}
/* Dispose function memory */
static void engineFunctionDispose(dict *d, void *obj) {
UNUSED(d);
functionInfo *fi = obj;
sdsfree(fi->code);
sdsfree(fi->name);
if (fi->desc) {
sdsfree(fi->desc);
}
engine *engine = fi->ei->engine;
engine->free_function(engine->engine_ctx, fi->function);
zfree(fi);
}
/* Free function memory and detele it from the functions dictionary */
static void engineFunctionFree(functionInfo *fi, functionsCtx *functions) {
functions->cache_memory -= functionMallocSize(fi);
dictDelete(functions->functions, fi->name);
}
/* Clear all the functions from the given functions ctx */
void functionsCtxClear(functionsCtx *functions_ctx) {
dictEmpty(functions_ctx->functions, NULL);
functions_ctx->cache_memory = 0;
}
/* Free the given functions ctx */
void functionsCtxFree(functionsCtx *functions_ctx) {
functionsCtxClear(functions_ctx);
dictRelease(functions_ctx->functions);
zfree(functions_ctx);
}
/* Swap the current functions ctx with the given one.
* Free the old functions ctx. */
void functionsCtxSwapWithCurrent(functionsCtx *new_functions_ctx) {
functionsCtxFree(functions_ctx);
functions_ctx = new_functions_ctx;
}
/* return the current functions ctx */
functionsCtx* functionsCtxGetCurrent() {
return functions_ctx;
}
/* Create a new functions ctx */
functionsCtx* functionsCtxCreate() {
functionsCtx *ret = zmalloc(sizeof(functionsCtx));
ret->functions = dictCreate(&functionDictType);
ret->cache_memory = 0;
return ret;
}
/*
* Register a function info to functions dictionary
* 1. Set the function client
* 2. Add function to functions dictionary
* 3. update cache memory
*/
static void engineFunctionRegister(functionInfo *fi, functionsCtx *functions) {
int res = dictAdd(functions->functions, fi->name, fi);
serverAssert(res == DICT_OK);
functions->cache_memory += functionMallocSize(fi);
}
/*
* Creating a function info object and register it.
* Return the created object
*/
static functionInfo* engineFunctionCreate(sds name, void *function, engineInfo *ei,
sds desc, sds code, functionsCtx *functions)
{
functionInfo *fi = zmalloc(sizeof(*fi));
*fi = (functionInfo ) {
.name = sdsdup(name),
.function = function,
.ei = ei,
.code = sdsdup(code),
.desc = desc ? sdsdup(desc) : NULL,
};
engineFunctionRegister(fi, functions);
return fi;
}
/* Register an engine, should be called once by the engine on startup and give the following:
*
* - engine_name - name of the engine to register
* - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */
int functionsRegisterEngine(const char *engine_name, engine *engine) {
sds engine_name_sds = sdsnew(engine_name);
if (dictFetchValue(engines, engine_name_sds)) {
serverLog(LL_WARNING, "Same engine was registered twice");
sdsfree(engine_name_sds);
return C_ERR;
}
client *c = createClient(NULL);
c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT);
engineInfo *ei = zmalloc(sizeof(*ei));
*ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,};
dictAdd(engines, engine_name_sds, ei);
engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) +
zmalloc_size(engine) +
engine->get_engine_memory_overhead(engine->engine_ctx);
return C_OK;
}
/*
* FUNCTION STATS
*/
void functionsStatsCommand(client *c) {
if (scriptIsRunning() && scriptIsEval()) {
addReplyErrorObject(c, shared.slowevalerr);
return;
}
addReplyMapLen(c, 2);
addReplyBulkCString(c, "running_script");
if (!scriptIsRunning()) {
addReplyNull(c);
} else {
addReplyMapLen(c, 3);
addReplyBulkCString(c, "name");
addReplyBulkCString(c, scriptCurrFunction());
addReplyBulkCString(c, "command");
client *script_client = scriptGetCaller();
addReplyArrayLen(c, script_client->argc);
for (int i = 0 ; i < script_client->argc ; ++i) {
addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr));
}
addReplyBulkCString(c, "duration_ms");
addReplyLongLong(c, scriptRunDuration());
}
addReplyBulkCString(c, "engines");
addReplyArrayLen(c, dictSize(engines));
dictIterator *iter = dictGetIterator(engines);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
engineInfo *ei = dictGetVal(entry);
addReplyBulkCString(c, ei->name);
}
dictReleaseIterator(iter);
}
/*
* FUNCTION LIST
*/
void functionsListCommand(client *c) {
/* general information on all the functions */
addReplyArrayLen(c, dictSize(functions_ctx->functions));
dictIterator *iter = dictGetIterator(functions_ctx->functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
addReplyMapLen(c, 3);
addReplyBulkCString(c, "name");
addReplyBulkCBuffer(c, fi->name, sdslen(fi->name));
addReplyBulkCString(c, "engine");
addReplyBulkCBuffer(c, fi->ei->name, sdslen(fi->ei->name));
addReplyBulkCString(c, "description");
if (fi->desc) {
addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc));
} else {
addReplyNull(c);
}
}
dictReleaseIterator(iter);
}
/*
* FUNCTION INFO <FUNCTION NAME> [WITHCODE]
*/
void functionsInfoCommand(client *c) {
if (c->argc > 4) {
addReplyErrorFormat(c,"wrong number of arguments for '%s' command or subcommand", c->cmd->name);
return;
}
/* dedicated information on specific function */
robj *function_name = c->argv[2];
int with_code = 0;
if (c->argc == 4) {
robj *with_code_arg = c->argv[3];
if (!strcasecmp(with_code_arg->ptr, "withcode")) {
with_code = 1;
}
}
functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr);
if (!fi) {
addReplyError(c, "Function does not exists");
return;
}
addReplyMapLen(c, with_code? 4 : 3);
addReplyBulkCString(c, "name");
addReplyBulkCBuffer(c, fi->name, sdslen(fi->name));
addReplyBulkCString(c, "engine");
addReplyBulkCBuffer(c, fi->ei->name, sdslen(fi->ei->name));
addReplyBulkCString(c, "description");
if (fi->desc) {
addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc));
} else {
addReplyNull(c);
}
if (with_code) {
addReplyBulkCString(c, "code");
addReplyBulkCBuffer(c, fi->code, sdslen(fi->code));
}
}
/*
* FUNCTION DELETE <FUNCTION NAME>
*/
void functionsDeleteCommand(client *c) {
if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) {
addReplyError(c, "Can not delete a function on a read only replica");
return;
}
robj *function_name = c->argv[2];
functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr);
if (!fi) {
addReplyError(c, "Function not found");
return;
}
engineFunctionFree(fi, functions_ctx);
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
addReply(c, shared.ok);
}
void functionsKillCommand(client *c) {
scriptKill(c, 0);
}
static void fcallCommandGeneric(client *c, int ro) {
robj *function_name = c->argv[1];
functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr);
if (!fi) {
addReplyError(c, "Function not found");
return;
}
engine *engine = fi->ei->engine;
long long numkeys;
/* Get the number of arguments that are keys */
if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) {
addReplyError(c, "Bad number of keys provided");
return;
}
if (numkeys > (c->argc - 3)) {
addReplyError(c, "Number of keys can't be greater than number of args");
return;
} else if (numkeys < 0) {
addReplyError(c, "Number of keys can't be negative");
return;
}
scriptRunCtx run_ctx;
scriptPrepareForRun(&run_ctx, fi->ei->c, c, fi->name);
if (ro) {
run_ctx.flags |= SCRIPT_READ_ONLY;
}
engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys,
c->argv + 3 + numkeys, c->argc - 3 - numkeys);
scriptResetRun(&run_ctx);
}
/*
* FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn>
*/
void fcallCommand(client *c) {
fcallCommandGeneric(c, 0);
}
/*
* FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn>
*/
void fcallCommandReadOnly(client *c) {
fcallCommandGeneric(c, 1);
}
void functionsHelpCommand(client *c) {
const char *help[] = {
"CREATE <ENGINE NAME> <FUNCTION NAME> [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE>",
" Create a new function with the given function name and code.",
"DELETE <FUNCTION NAME>",
" Delete the given function.",
"INFO <FUNCTION NAME> [WITHCODE]",
" For each function, print the following information about the function:",
" * Function name",
" * The engine used to run the function",
" * Function description",
" * Function code (only if WITHCODE is given)",
"LIST",
" Return general information on all the functions:",
" * Function name",
" * The engine used to run the function",
" * Function description",
"STATS",
" Return information about the current function running:",
" * Function name",
" * Command used to run the function",
" * Duration in MS that the function is running",
" If not function is running, return nil",
" In addition, returns a list of available engines.",
"KILL",
" Kill the current running function.",
NULL };
addReplyHelp(c, help);
}
/* Compile and save the given function, return C_OK on success and C_ERR on failure.
* In case on failure the err out param is set with relevant error message */
int functionsCreateWithFunctionCtx(sds function_name,sds engine_name, sds desc, sds code,
int replace, sds* err, functionsCtx *functions) {
engineInfo *ei = dictFetchValue(engines, engine_name);
if (!ei) {
*err = sdsnew("Engine not found");
return C_ERR;
}
engine *engine = ei->engine;
functionInfo *fi = dictFetchValue(functions->functions, function_name);
if (fi && !replace) {
*err = sdsnew("Function already exists");
return C_ERR;
}
void *function = engine->create(engine->engine_ctx, code, err);
if (*err) {
return C_ERR;
}
if (fi) {
/* free the already existing function as we are going to replace it */
engineFunctionFree(fi, functions);
}
engineFunctionCreate(function_name, function, ei, desc, code, functions);
return C_OK;
}
/*
* FUNCTION CREATE <ENGINE NAME> <FUNCTION NAME>
* [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE>
*
* ENGINE NAME - name of the engine to use the run the function
* FUNCTION NAME - name to use to invoke the function
* REPLACE - optional, replace existing function
* DESCRIPTION - optional, function description
* FUNCTION CODE - function code to pass to the engine
*/
void functionsCreateCommand(client *c) {
if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) {
addReplyError(c, "Can not create a function on a read only replica");
return;
}
robj *engine_name = c->argv[2];
robj *function_name = c->argv[3];
int replace = 0;
int argc_pos = 4;
sds desc = NULL;
while (argc_pos < c->argc - 1) {
robj *next_arg = c->argv[argc_pos++];
if (!strcasecmp(next_arg->ptr, "replace")) {
replace = 1;
continue;
}
if (!strcasecmp(next_arg->ptr, "description")) {
if (argc_pos >= c->argc) {
addReplyError(c, "Bad function description");
return;
}
desc = c->argv[argc_pos++]->ptr;
continue;
}
}
if (argc_pos >= c->argc) {
addReplyError(c, "Function code is missing");
return;
}
robj *code = c->argv[argc_pos];
sds err = NULL;
if (functionsCreateWithFunctionCtx(function_name->ptr, engine_name->ptr,
desc, code->ptr, replace, &err, functions_ctx) != C_OK)
{
addReplyErrorSds(c, err);
return;
}
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
addReply(c, shared.ok);
}
/* Return memory usage of all the engines combine */
unsigned long functionsMemory() {
dictIterator *iter = dictGetIterator(engines);
dictEntry *entry = NULL;
size_t engines_nemory = 0;
while ((entry = dictNext(iter))) {
engineInfo *ei = dictGetVal(entry);
engine *engine = ei->engine;
engines_nemory += engine->get_used_memory(engine->engine_ctx);
}
dictReleaseIterator(iter);
return engines_nemory;
}
/* Return memory overhead of all the engines combine */
unsigned long functionsMemoryOverhead() {
size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) +
dictSlots(engines) * sizeof(dictEntry*);
memory_overhead += dictSize(functions_ctx->functions) * sizeof(dictEntry) +
dictSlots(functions_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsCtx);
memory_overhead += functions_ctx->cache_memory;
memory_overhead += engine_cache_memory;
return memory_overhead;
}
/* Returns the number of functions */
unsigned long functionsNum() {
return dictSize(functions_ctx->functions);
}
dict* functionsGet() {
return functions_ctx->functions;
}
/* Initialize engine data structures.
* Should be called once on server initialization */
int functionsInit() {
engines = dictCreate(&engineDictType);
functions_ctx = functionsCtxCreate();
if (luaEngineInitEngine() != C_OK) {
return C_ERR;
}
return C_OK;
}

126
src/functions.h Normal file
View File

@ -0,0 +1,126 @@
/*
* Copyright (c) 2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __FUNCTIONS_H_
#define __FUNCTIONS_H_
/*
* functions.c unit provides the Redis Functions API:
* * FUNCTION CREATE
* * FUNCTION CALL
* * FUNCTION DELETE
* * FUNCTION KILL
* * FUNCTION INFO
*
* Also contains implementation for:
* * Save/Load function from rdb
* * Register engines
*/
#include "server.h"
#include "script.h"
#include "redismodule.h"
typedef struct engine {
/* engine specific context */
void *engine_ctx;
/* Create function callback, get the engine_ctx, and function code.
* returns NULL on error and set sds to be the error message */
void* (*create)(void *engine_ctx, sds code, sds *err);
/* Invoking a function, r_ctx is an opaque object (from engine POV).
* The r_ctx should be used by the engine to interaction with Redis,
* such interaction could be running commands, set resp, or set
* replication mode
*/
void (*call)(scriptRunCtx *r_ctx, void *engine_ctx, void *compiled_function,
robj **keys, size_t nkeys, robj **args, size_t nargs);
/* get current used memory by the engine */
size_t (*get_used_memory)(void *engine_ctx);
/* Return memory overhead for a given function,
* such memory is not counted as engine memory but as general
* structs memory that hold different information */
size_t (*get_function_memory_overhead)(void *compiled_function);
/* Return memory overhead for engine (struct size holding the engine)*/
size_t (*get_engine_memory_overhead)(void *engine_ctx);
/* free the given function */
void (*free_function)(void *engine_ctx, void *compiled_function);
} engine;
/* Hold information about an engine.
* Used on rdb.c so it must be declared here. */
typedef struct engineInfo {
sds name; /* Name of the engine */
engine *engine; /* engine callbacks that allows to interact with the engine */
client *c; /* Client that is used to run commands */
} engineInfo;
/* Hold information about the specific function.
* Used on rdb.c so it must be declared here. */
typedef struct functionInfo {
sds name; /* Function name */
void *function; /* Opaque object that set by the function's engine and allow it
to run the function, usually it's the function compiled code. */
engineInfo *ei; /* Pointer to the function engine */
sds code; /* Function code */
sds desc; /* Function description */
} functionInfo;
int functionsRegisterEngine(const char *engine_name, engine *engine_ctx);
int functionsCreateWithFunctionCtx(sds function_name, sds engine_name, sds desc, sds code,
int replace, sds* err, functionsCtx *functions);
void functionsCreateCommand(client *c);
void fcallCommand(client *c);
void fcallCommandReadOnly(client *c);
void functionsDeleteCommand(client *c);
void functionsKillCommand(client *c);
void functionsStatsCommand(client *c);
void functionsInfoCommand(client *c);
void functionsListCommand(client *c);
void functionsHelpCommand(client *c);
unsigned long functionsMemory();
unsigned long functionsMemoryOverhead();
int functionsLoad(rio *rdb, int ver);
unsigned long functionsNum();
dict* functionsGet();
functionsCtx* functionsCtxGetCurrent();
functionsCtx* functionsCtxCreate();
void functionsCtxFree(functionsCtx *functions_ctx);
void functionsCtxClear(functionsCtx *functions_ctx);
void functionsCtxSwapWithCurrent(functionsCtx *functions_ctx);
int luaEngineInitEngine();
int functionsInit();
#endif /* __FUNCTIONS_H_ */

View File

@ -29,6 +29,7 @@
*/
#include "server.h"
#include "functions.h"
#include <math.h>
#include <ctype.h>
@ -1212,6 +1213,8 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
}
mh->lua_caches = mem;
mem_total+=mem;
mh->functions_caches = functionsMemoryOverhead();
mem_total+=mh->functions_caches;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@ -1527,7 +1530,7 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
struct redisMemOverhead *mh = getMemoryOverheadData();
addReplyMapLen(c,25+mh->num_dbs);
addReplyMapLen(c,26+mh->num_dbs);
addReplyBulkCString(c,"peak.allocated");
addReplyLongLong(c,mh->peak_allocated);
@ -1553,6 +1556,9 @@ NULL
addReplyBulkCString(c,"lua.caches");
addReplyLongLong(c,mh->lua_caches);
addReplyBulkCString(c,"functions.caches");
addReplyLongLong(c,mh->functions_caches);
for (size_t j = 0; j < mh->num_dbs; j++) {
char dbname[32];
snprintf(dbname,sizeof(dbname),"db.%zd",mh->db[j].dbid);

104
src/rdb.c
View File

@ -32,6 +32,7 @@
#include "zipmap.h"
#include "endianconv.h"
#include "stream.h"
#include "functions.h"
#include <math.h>
#include <fcntl.h>
@ -1239,6 +1240,25 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
/* save functions */
dict *functions = functionsGet();
dictIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
functionInfo* fi = dictGetVal(entry);
if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto werr;
if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto werr;
if (fi->desc) {
if (rdbSaveLen(rdb, 1) == -1) goto werr; /* desc exists */
if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto werr;
} else {
if (rdbSaveLen(rdb, 0) == -1) goto werr; /* desc not exists */
}
if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto werr;
}
dictReleaseIterator(iter);
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
@ -2687,12 +2707,80 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
}
}
static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx) {
UNUSED(ver);
sds name = NULL;
sds engine_name = NULL;
sds desc = NULL;
sds blob = NULL;
sds err = NULL;
uint64_t has_desc;
int res = C_ERR;
if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading function name");
goto error;
}
if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading engine name");
goto error;
}
if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) {
serverLog(LL_WARNING, "Failed loading function desc indicator");
goto error;
}
if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading function desc");
goto error;
}
if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading function blob");
goto error;
}
if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, 0, &err, functions_ctx) != C_OK) {
serverLog(LL_WARNING, "Failed compiling and saving the function %s", err);
goto error;
}
res = C_OK;
error:
if (name) sdsfree(name);
if (engine_name) sdsfree(engine_name);
if (desc) sdsfree(desc);
if (blob) sdsfree(blob);
if (err) sdsfree(err);
return res;
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) {
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
functionsCtx* functions_ctx = functionsCtxGetCurrent();
functionsCtxClear(functions_ctx);
rdbLoadingCtx loading_ctx = { .dbarray = server.db, .functions_ctx = functions_ctx };
int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx);
if (retval != C_OK) {
/* Loading failed, clear the function ctx */
functionsCtxClear(functions_ctx);
}
return retval;
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly.
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
* currently it only allow to set db object and functionsCtx to which the data
* will be loaded (in the future it might contains more such objects). */
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = dbarray+0;
redisDb *db = rdb_loading_ctx->dbarray+0;
char buf[1024];
int error;
long long empty_keys_skipped = 0;
@ -2764,7 +2852,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) {
"databases. Exiting\n", server.dbnum);
exit(1);
}
db = dbarray+dbid;
db = rdb_loading_ctx->dbarray+dbid;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@ -2895,6 +2983,12 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) {
decrRefCount(aux);
continue; /* Read next opcode. */
}
} else if (type == RDB_OPCODE_FUNCTION) {
if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx) != C_OK) {
serverLog(LL_WARNING,"Failed loading function");
goto eoferr;
}
continue;
}
/* Read key */
@ -3044,7 +3138,9 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rdbflags,rsi,server.db);
retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp);
stopLoading(retval==C_OK);
return retval;

View File

@ -100,6 +100,7 @@
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
@ -166,7 +167,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *db);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);

View File

@ -32,6 +32,7 @@
#include "server.h"
#include "cluster.h"
#include "bio.h"
#include "functions.h"
#include <memory.h>
#include <sys/time.h>
@ -1738,6 +1739,7 @@ void readSyncBulkPayload(connection *conn) {
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
redisDb *diskless_load_tempDb = NULL;
functionsCtx* temp_functions_ctx = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;
@ -1913,6 +1915,7 @@ void readSyncBulkPayload(connection *conn) {
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Initialize empty tempDb dictionaries. */
diskless_load_tempDb = disklessLoadInitTempDb();
temp_functions_ctx = functionsCtxCreate();
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED,
@ -1935,6 +1938,7 @@ void readSyncBulkPayload(connection *conn) {
if (use_diskless_load) {
rio rdb;
redisDb *dbarray;
functionsCtx* functions_ctx;
int asyncLoading = 0;
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@ -1947,8 +1951,11 @@ void readSyncBulkPayload(connection *conn) {
asyncLoading = 1;
}
dbarray = diskless_load_tempDb;
functions_ctx = temp_functions_ctx;
} else {
dbarray = server.db;
functions_ctx = functionsCtxGetCurrent();
functionsCtxClear(functions_ctx);
}
rioInitWithConn(&rdb,conn,server.repl_transfer_size);
@ -1960,7 +1967,8 @@ void readSyncBulkPayload(connection *conn) {
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
int loadingFailed = 0;
if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi,dbarray) != C_OK) {
rdbLoadingCtx loadingCtx = { .dbarray = dbarray, .functions_ctx = functions_ctx };
if (rdbLoadRioWithLoadingCtx(&rdb,RDBFLAGS_REPLICATION,&rsi,&loadingCtx) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
@ -1988,6 +1996,7 @@ void readSyncBulkPayload(connection *conn) {
NULL);
disklessLoadDiscardTempDb(diskless_load_tempDb);
functionsCtxFree(temp_functions_ctx);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
@ -2010,6 +2019,9 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB");
swapMainDbWithTempDb(diskless_load_tempDb);
/* swap existing functions ctx with the temporary one */
functionsCtxSwapWithCurrent(temp_functions_ctx);
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
NULL);

View File

@ -60,6 +60,11 @@ client* scriptGetClient() {
return curr_run_ctx->c;
}
client* scriptGetCaller() {
serverAssert(scriptIsRunning());
return curr_run_ctx->original_client;
}
/* interrupt function for scripts, should be call
* from time to time to reply some special command (like ping)
* and also check if the run should be terminated. */
@ -78,8 +83,8 @@ int scriptInterrupt(scriptRunCtx *run_ctx) {
serverLog(LL_WARNING,
"Slow script detected: still in execution after %lld milliseconds. "
"You can try killing the script using the SCRIPT KILL command.",
elapsed);
"You can try killing the script using the %s command.",
elapsed, (run_ctx->flags & SCRIPT_EVAL_MODE) ? "SCRIPT KILL" : "FUNCTION KILL");
enterScriptTimedoutMode(run_ctx);
/* Once the script timeouts we reenter the event loop to permit others
@ -159,8 +164,18 @@ int scriptIsRunning() {
return curr_run_ctx != NULL;
}
const char* scriptCurrFunction() {
serverAssert(scriptIsRunning());
return curr_run_ctx->funcname;
}
int scriptIsEval() {
serverAssert(scriptIsRunning());
return curr_run_ctx->flags & SCRIPT_EVAL_MODE;
}
/* Kill the current running script */
void scriptKill(client *c) {
void scriptKill(client *c, int is_eval) {
if (!curr_run_ctx) {
addReplyError(c, "-NOTBUSY No scripts in execution right now.");
return;
@ -177,6 +192,16 @@ void scriptKill(client *c) {
"using the SHUTDOWN NOSAVE command.");
return;
}
if (is_eval && !(curr_run_ctx->flags & SCRIPT_EVAL_MODE)) {
/* Kill a function with 'SCRIPT KILL' is not allow */
addReplyErrorObject(c, shared.slowscripterr);
return;
}
if (!is_eval && (curr_run_ctx->flags & SCRIPT_EVAL_MODE)) {
/* Kill an eval with 'FUNCTION KILL' is not allow */
addReplyErrorObject(c, shared.slowevalerr);
return;
}
curr_run_ctx->flags |= SCRIPT_KILLED;
addReply(c, shared.ok);
}
@ -430,3 +455,10 @@ mstime_t scriptTimeSnapshot() {
serverAssert(!curr_run_ctx);
return curr_run_ctx->snapshot_time;
}
long long scriptRunDuration() {
serverAssert(scriptIsRunning());
return elapsedMs(curr_run_ctx->start_time);
}

View File

@ -69,6 +69,7 @@
#define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */
#define SCRIPT_EVAL_REPLICATION (1ULL<<6) /* mode for eval, indicate that we replicate the
script invocation and not the effects */
#define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */
typedef struct scriptRunCtx scriptRunCtx;
struct scriptRunCtx {
@ -87,10 +88,14 @@ int scriptSetResp(scriptRunCtx *r_ctx, int resp);
int scriptSetRepl(scriptRunCtx *r_ctx, int repl);
void scriptCall(scriptRunCtx *r_ctx, robj **argv, int argc, sds *err);
int scriptInterrupt(scriptRunCtx *r_ctx);
void scriptKill(client *c);
void scriptKill(client *c, int is_eval);
int scriptIsRunning();
const char* scriptCurrFunction();
int scriptIsEval();
int scriptIsTimedout();
client* scriptGetClient();
client* scriptGetCaller();
mstime_t scriptTimeSnapshot();
long long scriptRunDuration();
#endif /* __SCRIPT_H_ */

View File

@ -867,6 +867,8 @@ cleanup:
}
c->user = NULL;
c->argv = NULL;
c->argc = 0;
if (raise_error) {
/* If we are here we should have an error in the stack, in the
@ -1067,8 +1069,12 @@ static void luaRemoveUnsupportedFunctions(lua_State *lua) {
* the creation of globals accidentally.
*
* It should be the last to be called in the scripting engine initialization
* sequence, because it may interact with creation of globals. */
void luaEnableGlobalsProtection(lua_State *lua) {
* sequence, because it may interact with creation of globals.
*
* On Legacy Lua (eval) we need to check 'w ~= \"main\"' otherwise we will not be able
* to create the global 'function <sha> ()' variable. On Lua engine we do not use this trick
* so its not needed. */
void luaEnableGlobalsProtection(lua_State *lua, int is_eval) {
char *s[32];
sds code = sdsempty();
int j = 0;
@ -1081,7 +1087,7 @@ void luaEnableGlobalsProtection(lua_State *lua) {
s[j++]="mt.__newindex = function (t, n, v)\n";
s[j++]=" if dbg.getinfo(2) then\n";
s[j++]=" local w = dbg.getinfo(2, \"S\").what\n";
s[j++]=" if w ~= \"main\" and w ~= \"C\" then\n";
s[j++]= is_eval ? " if w ~= \"main\" and w ~= \"C\" then\n" : " if w ~= \"C\" then\n";
s[j++]=" error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n";
s[j++]=" end\n";
s[j++]=" end\n";
@ -1336,3 +1342,7 @@ void luaCallFunction(scriptRunCtx* run_ctx, lua_State *lua, robj** keys, size_t
/* remove run_ctx from registry, its only applicable for the current script. */
luaSaveOnRegistry(lua, REGISTRY_RUN_CTX_NAME, NULL);
}
unsigned long luaMemory(lua_State *lua) {
return lua_gc(lua, LUA_GCCOUNT, 0) * 1024LL;
}

View File

@ -57,10 +57,11 @@
#define REGISTRY_RUN_CTX_NAME "__RUN_CTX__"
void luaRegisterRedisAPI(lua_State* lua);
void luaEnableGlobalsProtection(lua_State *lua);
void luaEnableGlobalsProtection(lua_State *lua, int is_eval);
void luaSaveOnRegistry(lua_State* lua, const char* name, void* ptr);
void* luaGetFromRegistry(lua_State* lua, const char* name);
void luaCallFunction(scriptRunCtx* r_ctx, lua_State *lua, robj** keys, size_t nkeys, robj** args, size_t nargs, int debug_enabled);
unsigned long luaMemory(lua_State *lua);
#endif /* __SCRIPT_LUA_H_ */

View File

@ -35,7 +35,7 @@
#include "latency.h"
#include "atomicvar.h"
#include "mt19937-64.h"
#include "script.h"
#include "functions.h"
#include <time.h>
#include <signal.h>
@ -472,6 +472,31 @@ struct redisCommand scriptSubcommands[] = {
{NULL},
};
struct redisCommand functionSubcommands[] = {
{"create",functionsCreateCommand,-5,
"may-replicate no-script @scripting"},
{"delete",functionsDeleteCommand,3,
"may-replicate no-script @scripting"},
{"kill",functionsKillCommand,2,
"no-script @scripting"},
{"info",functionsInfoCommand,-3,
"no-script @scripting"},
{"list",functionsListCommand,2,
"no-script @scripting"},
{"stats",functionsStatsCommand,2,
"no-script @scripting"},
{"help",functionsHelpCommand,2,
"ok-loading ok-stale @scripting"},
{NULL},
};
struct redisCommand clientSubcommands[] = {
{"caching",clientCommand,3,
"no-script ok-loading ok-stale @connection"},
@ -2033,7 +2058,25 @@ struct redisCommand redisCommandTable[] = {
"no-auth no-script ok-stale ok-loading fast @connection"},
{"failover",failoverCommand,-1,
"admin no-script ok-stale"}
"admin no-script ok-stale"},
{"function",NULL,-2,
"",
.subcommands=functionSubcommands},
{"fcall",fcallCommand,-3,
"no-script no-monitor may-replicate no-mandatory-keys @scripting",
{{"read write", /* We pass both read and write because these flag are worst-case-scenario */
KSPEC_BS_INDEX,.bs.index={2},
KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},
functionGetKeys},
{"fcall_ro",fcallCommandReadOnly,-3,
"no-script no-monitor no-mandatory-keys @scripting",
{{"read",
KSPEC_BS_INDEX,.bs.index={2},
KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},
functionGetKeys},
};
/*============================ Utility functions ============================ */
@ -2209,6 +2252,11 @@ void dictSdsDestructor(dict *d, void *val)
sdsfree(val);
}
void *dictSdsDup(dict *d, const void *key) {
UNUSED(d);
return sdsdup((const sds) key);
}
int dictObjKeyCompare(dict *d, const void *key1,
const void *key2)
{
@ -3517,8 +3565,10 @@ void createSharedObjects(void) {
"-NOSCRIPT No matching script. Please use EVAL.\r\n"));
shared.loadingerr = createObject(OBJ_STRING,sdsnew(
"-LOADING Redis is loading the dataset in memory\r\n"));
shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
shared.slowevalerr = createObject(OBJ_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n"));
shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
"-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
@ -4345,6 +4395,7 @@ void initServer(void) {
if (server.cluster_enabled) clusterInit();
replicationScriptCacheInit();
scriptingInit(1);
functionsInit();
slowlogInit();
latencyMonitorInit();
@ -5448,9 +5499,15 @@ int processCommand(client *c) {
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
tolower(((char*)c->argv[1]->ptr)[0]) == 'k') &&
!(c->cmd->proc == functionsKillCommand) &&
!(c->cmd->proc == functionsStatsCommand))
{
rejectCommand(c, shared.slowscripterr);
if (scriptIsEval()) {
rejectCommand(c, shared.slowevalerr);
} else {
rejectCommand(c, shared.slowscripterr);
}
return C_OK;
}
@ -6281,6 +6338,7 @@ sds genRedisInfoString(const char *section) {
char peak_hmem[64];
char total_system_hmem[64];
char used_memory_lua_hmem[64];
char used_memory_vm_total_hmem[64];
char used_memory_scripts_hmem[64];
char used_memory_rss_hmem[64];
char maxmemory_hmem[64];
@ -6288,6 +6346,7 @@ sds genRedisInfoString(const char *section) {
size_t total_system_mem = server.system_memory_size;
const char *evict_policy = evictPolicyToString();
long long memory_lua = evalMemory();
long long memory_functions = functionsMemory();
struct redisMemOverhead *mh = getMemoryOverheadData();
/* Peak memory is updated from time to time by serverCron() so it
@ -6301,7 +6360,8 @@ sds genRedisInfoString(const char *section) {
bytesToHuman(peak_hmem,server.stat_peak_memory);
bytesToHuman(total_system_hmem,total_system_mem);
bytesToHuman(used_memory_lua_hmem,memory_lua);
bytesToHuman(used_memory_scripts_hmem,mh->lua_caches);
bytesToHuman(used_memory_vm_total_hmem,memory_functions + memory_lua);
bytesToHuman(used_memory_scripts_hmem,mh->lua_caches + mh->functions_caches);
bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss);
bytesToHuman(maxmemory_hmem,server.maxmemory);
@ -6324,11 +6384,18 @@ sds genRedisInfoString(const char *section) {
"allocator_resident:%zu\r\n"
"total_system_memory:%lu\r\n"
"total_system_memory_human:%s\r\n"
"used_memory_lua:%lld\r\n"
"used_memory_lua_human:%s\r\n"
"used_memory_lua:%lld\r\n" /* deprecated, renamed to used_memory_vm_eval */
"used_memory_vm_eval:%lld\r\n"
"used_memory_lua_human:%s\r\n" /* deprecated */
"used_memory_scripts_eval:%lld\r\n"
"number_of_cached_scripts:%lu\r\n"
"number_of_functions:%lu\r\n"
"used_memory_vm_functions:%lld\r\n"
"used_memory_vm_total:%lld\r\n"
"used_memory_vm_total_human:%s\r\n"
"used_memory_functions:%lld\r\n"
"used_memory_scripts:%lld\r\n"
"used_memory_scripts_human:%s\r\n"
"number_of_cached_scripts:%lu\r\n"
"maxmemory:%lld\r\n"
"maxmemory_human:%s\r\n"
"maxmemory_policy:%s\r\n"
@ -6367,10 +6434,17 @@ sds genRedisInfoString(const char *section) {
(unsigned long)total_system_mem,
total_system_hmem,
memory_lua,
memory_lua,
used_memory_lua_hmem,
(long long) mh->lua_caches,
used_memory_scripts_hmem,
dictSize(evalScriptsDict()),
functionsNum(),
memory_functions,
memory_functions + memory_lua,
used_memory_vm_total_hmem,
(long long) mh->functions_caches,
(long long) mh->lua_caches + (long long) mh->functions_caches,
used_memory_scripts_hmem,
server.maxmemory,
maxmemory_hmem,
evict_policy,

View File

@ -822,6 +822,19 @@ typedef struct redisDb {
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;
/* forward declaration for functions ctx */
typedef struct functionsCtx functionsCtx;
/* Holding object that need to be populated during
* rdb loading. On loading end it is possible to decide
* whether not to set those objects on their rightful place.
* For example: dbarray need to be set as main database on
* successful loading and dropped on failure. */
typedef struct rdbLoadingCtx {
redisDb* dbarray;
functionsCtx* functions_ctx;
}rdbLoadingCtx;
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
@ -1122,7 +1135,7 @@ struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
*queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4],
*emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
*outofrangeerr, *noscripterr, *loadingerr, *slowevalerr, *slowscripterr, *bgsaveerr,
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
@ -1203,6 +1216,7 @@ struct redisMemOverhead {
size_t clients_normal;
size_t aof_buffer;
size_t lua_caches;
size_t functions_caches;
size_t overhead_total;
size_t dataset;
size_t total_keys;
@ -2670,6 +2684,7 @@ int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysRes
int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
@ -2761,6 +2776,7 @@ uint64_t dictSdsCaseHash(const void *key);
int dictSdsKeyCompare(dict *d, const void *key1, const void *key2);
int dictSdsKeyCaseCompare(dict *d, const void *key1, const void *key2);
void dictSdsDestructor(dict *d, void *val);
void *dictSdsDup(dict *d, const void *key);
/* Git SHA1 */
char *redisGitSHA1(void);

View File

@ -521,6 +521,12 @@ foreach testType {Successful Aborted} {
# Set a key value on replica to check status during loading, on failure and after swapping db
$replica set mykey myvalue
# Set a function value on replica to check status during loading, on failure and after swapping db
$replica function create LUA test {return 'hello1'}
# Set a function value on master to check it reaches the replica when replication ends
$master function create LUA test {return 'hello2'}
# Force the replica to try another full sync (this time it will have matching master replid)
$master multi
$master client kill type replica
@ -552,6 +558,9 @@ foreach testType {Successful Aborted} {
# Ensure we still see old values while async_loading is in progress and also not LOADING status
assert_equal [$replica get mykey] "myvalue"
# Ensure we still can call old function while async_loading is in progress
assert_equal [$replica fcall test 0] "hello1"
# Make sure we're still async_loading to validate previous assertion
assert_equal [s -1 async_loading] 1
@ -576,6 +585,9 @@ foreach testType {Successful Aborted} {
# Ensure we see old values from replica
assert_equal [$replica get mykey] "myvalue"
# Ensure we still can call old function
assert_equal [$replica fcall test 0] "hello1"
# Make sure amount of replica keys didn't change
assert_equal [$replica dbsize] 2001
}
@ -595,6 +607,9 @@ foreach testType {Successful Aborted} {
# Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status
assert_equal [$replica GET mykey] ""
# Ensure we got the new function
assert_equal [$replica fcall test 0] "hello2"
# Make sure amount of keys matches master
assert_equal [$replica dbsize] 1010
}
@ -624,6 +639,10 @@ test {diskless loading short read} {
$replica config set dynamic-hz no
# Try to fill the master with all types of data types / encodings
set start [clock clicks -milliseconds]
# Set a function value to check short read handling on functions
r function create LUA test {return 'hello1'}
for {set k 0} {$k < 3} {incr k} {
for {set i 0} {$i < 10} {incr i} {
r set "$k int_$i" [expr {int(rand()*10000)}]

280
tests/unit/functions.tcl Normal file
View File

@ -0,0 +1,280 @@
start_server {tags {"scripting"}} {
test {FUNCTION - Basic usage} {
r function create LUA test {return 'hello'}
r fcall test 0
} {hello}
test {FUNCTION - Create an already exiting function raise error} {
catch {
r function create LUA test {return 'hello1'}
} e
set _ $e
} {*Function already exists*}
test {FUNCTION - Create function with unexisting engine} {
catch {
r function create bad_engine test {return 'hello1'}
} e
set _ $e
} {*Engine not found*}
test {FUNCTION - Test uncompiled script} {
catch {
r function create LUA test1 {bad script}
} e
set _ $e
} {*Error compiling function*}
test {FUNCTION - test replace argument} {
r function create LUA test REPLACE {return 'hello1'}
r fcall test 0
} {hello1}
test {FUNCTION - test replace argument with function creation failure keeps old function} {
catch {r function create LUA test REPLACE {error}}
r fcall test 0
} {hello1}
test {FUNCTION - test function delete} {
r function delete test
catch {
r fcall test 0
} e
set _ $e
} {*Function not found*}
test {FUNCTION - test description argument} {
r function create LUA test DESCRIPTION {some description} {return 'hello'}
r function list
} {{name test engine LUA description {some description}}}
test {FUNCTION - test info specific function} {
r function info test WITHCODE
} {name test engine LUA description {some description} code {return 'hello'}}
test {FUNCTION - test info without code} {
r function info test
} {name test engine LUA description {some description}}
test {FUNCTION - test info on function that does not exists} {
catch {
r function info bad_function_name
} e
set _ $e
} {*Function does not exists*}
test {FUNCTION - test info with bad number of arguments} {
catch {
r function info test WITHCODE bad_arg
} e
set _ $e
} {*wrong number of arguments*}
test {FUNCTION - test fcall bad arguments} {
catch {
r fcall test bad_arg
} e
set _ $e
} {*Bad number of keys provided*}
test {FUNCTION - test fcall bad number of keys arguments} {
catch {
r fcall test 10 key1
} e
set _ $e
} {*Number of keys can't be greater than number of args*}
test {FUNCTION - test fcall negative number of keys} {
catch {
r fcall test -1 key1
} e
set _ $e
} {*Number of keys can't be negative*}
test {FUNCTION - test function delete on not exiting function} {
catch {
r function delete test1
} e
set _ $e
} {*Function not found*}
test {FUNCTION - test function kill when function is not running} {
catch {
r function kill
} e
set _ $e
} {*No scripts in execution*}
test {FUNCTION - test wrong subcommand} {
catch {
r function bad_subcommand
} e
set _ $e
} {*Unknown subcommand*}
test {FUNCTION - test loading from rdb} {
r debug reload
r fcall test 0
} {hello}
test {FUNCTION - test fcall_ro with write command} {
r function create lua test REPLACE {return redis.call('set', 'x', '1')}
catch { r fcall_ro test 0 } e
set _ $e
} {*Write commands are not allowed from read-only scripts*}
test {FUNCTION - test fcall_ro with read only commands} {
r function create lua test REPLACE {return redis.call('get', 'x')}
r set x 1
r fcall_ro test 0
} {1}
test {FUNCTION - test keys and argv} {
r function create lua test REPLACE {return redis.call('set', KEYS[1], ARGV[1])}
r fcall test 1 x foo
r get x
} {foo}
test {FUNCTION - test command get keys on fcall} {
r COMMAND GETKEYS fcall test 1 x foo
} {x}
test {FUNCTION - test command get keys on fcall_ro} {
r COMMAND GETKEYS fcall_ro test 1 x foo
} {x}
test {FUNCTION - test function kill} {
set rd [redis_deferring_client]
r config set script-time-limit 10
r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
$rd fcall test 0
after 200
catch {r ping} e
assert_match {BUSY*} $e
assert_match {running_script {name test command {fcall test 0} duration_ms *} engines LUA} [r FUNCTION STATS]
r function kill
after 200 ; # Give some time to Lua to call the hook again...
assert_equal [r ping] "PONG"
}
test {FUNCTION - test script kill not working on function} {
set rd [redis_deferring_client]
r config set script-time-limit 10
r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
$rd fcall test 0
after 200
catch {r ping} e
assert_match {BUSY*} $e
catch {r script kill} e
assert_match {BUSY*} $e
r function kill
after 200 ; # Give some time to Lua to call the hook again...
assert_equal [r ping] "PONG"
}
test {FUNCTION - test function kill not working on eval} {
set rd [redis_deferring_client]
r config set script-time-limit 10
$rd eval {local a = 1 while true do a = a + 1 end} 0
after 200
catch {r ping} e
assert_match {BUSY*} $e
catch {r function kill} e
assert_match {BUSY*} $e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
assert_equal [r ping] "PONG"
}
}
start_server {tags {"scripting repl"}} {
start_server {} {
test "Connect a replica to the master instance" {
r -1 slaveof [srv 0 host] [srv 0 port]
wait_for_condition 50 100 {
[s -1 role] eq {slave} &&
[string match {*master_link_status:up*} [r -1 info replication]]
} else {
fail "Can't turn the instance into a replica"
}
}
test {FUNCTION - creation is replicated to replica} {
r function create LUA test DESCRIPTION {some description} {return 'hello'}
wait_for_condition 50 100 {
[r -1 function list] eq {{name test engine LUA description {some description}}}
} else {
fail "Failed waiting for function to replicate to replica"
}
}
test {FUNCTION - call on replica} {
r -1 fcall test 0
} {hello}
test {FUNCTION - delete is replicated to replica} {
r function delete test
wait_for_condition 50 100 {
[r -1 function list] eq {}
} else {
fail "Failed waiting for function to replicate to replica"
}
}
test "Disconnecting the replica from master instance" {
r -1 slaveof no one
# creating a function after disconnect to make sure function
# is replicated on rdb phase
r function create LUA test DESCRIPTION {some description} {return 'hello'}
# reconnect the replica
r -1 slaveof [srv 0 host] [srv 0 port]
wait_for_condition 50 100 {
[s -1 role] eq {slave} &&
[string match {*master_link_status:up*} [r -1 info replication]]
} else {
fail "Can't turn the instance into a replica"
}
}
test "FUNCTION - test replication to replica on rdb phase" {
r -1 fcall test 0
} {hello}
test "FUNCTION - test replication to replica on rdb phase info command" {
r -1 function info test WITHCODE
} {name test engine LUA description {some description} code {return 'hello'}}
test "FUNCTION - create on read only replica" {
catch {
r -1 function create LUA test DESCRIPTION {some description} {return 'hello'}
} e
set _ $e
} {*Can not create a function on a read only replica*}
test "FUNCTION - delete on read only replica" {
catch {
r -1 function delete test
} e
set _ $e
} {*Can not delete a function on a read only replica*}
test "FUNCTION - function effect is replicated to replica" {
r function create LUA test REPLACE {return redis.call('set', 'x', '1')}
r fcall test 0
assert {[r get x] eq {1}}
wait_for_condition 50 100 {
[r -1 get x] eq {1}
} else {
fail "Failed waiting function effect to be replicated to replica"
}
}
test "FUNCTION - modify key space of read only replica" {
catch {
r -1 fcall test 0
} e
set _ $e
} {*can't write against a read only replica*}
}
}

View File

@ -1,48 +1,87 @@
foreach is_eval {0 1} {
if {$is_eval == 1} {
proc run_script {args} {
r eval {*}$args
}
proc run_script_ro {args} {
r eval_ro {*}$args
}
proc run_script_on_connection {args} {
[lindex $args 0] eval {*}[lrange $args 1 end]
}
proc kill_script {args} {
r script kill
}
} else {
proc run_script {args} {
r function create LUA test replace [lindex $args 0]
r fcall test {*}[lrange $args 1 end]
}
proc run_script_ro {args} {
r function create LUA test replace [lindex $args 0]
r fcall_ro test {*}[lrange $args 1 end]
}
proc run_script_on_connection {args} {
set rd [lindex $args 0]
$rd function create LUA test replace [lindex $args 1]
# read the ok reply of function create
$rd read
$rd fcall test {*}[lrange $args 2 end]
}
proc kill_script {args} {
r function kill
}
}
start_server {tags {"scripting"}} {
test {EVAL - Does Lua interpreter replies to our requests?} {
r eval {return 'hello'} 0
run_script {return 'hello'} 0
} {hello}
test {EVAL - Lua integer -> Redis protocol type conversion} {
r eval {return 100.5} 0
run_script {return 100.5} 0
} {100}
test {EVAL - Lua string -> Redis protocol type conversion} {
r eval {return 'hello world'} 0
run_script {return 'hello world'} 0
} {hello world}
test {EVAL - Lua true boolean -> Redis protocol type conversion} {
r eval {return true} 0
run_script {return true} 0
} {1}
test {EVAL - Lua false boolean -> Redis protocol type conversion} {
r eval {return false} 0
run_script {return false} 0
} {}
test {EVAL - Lua status code reply -> Redis protocol type conversion} {
r eval {return {ok='fine'}} 0
run_script {return {ok='fine'}} 0
} {fine}
test {EVAL - Lua error reply -> Redis protocol type conversion} {
catch {
r eval {return {err='this is an error'}} 0
run_script {return {err='this is an error'}} 0
} e
set _ $e
} {this is an error}
test {EVAL - Lua table -> Redis protocol type conversion} {
r eval {return {1,2,3,'ciao',{1,2}}} 0
run_script {return {1,2,3,'ciao',{1,2}}} 0
} {1 2 3 ciao {1 2}}
test {EVAL - Are the KEYS and ARGV arrays populated correctly?} {
r eval {return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}} 2 a{t} b{t} c{t} d{t}
run_script {return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}} 2 a{t} b{t} c{t} d{t}
} {a{t} b{t} c{t} d{t}}
test {EVAL - is Lua able to call Redis API?} {
r set mykey myval
r eval {return redis.call('get',KEYS[1])} 1 mykey
run_script {return redis.call('get',KEYS[1])} 1 mykey
} {myval}
if {$is_eval eq 1} {
# eval sha is only relevant for is_eval Lua
test {EVALSHA - Can we call a SHA1 if already defined?} {
r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey
} {myval}
@ -60,10 +99,11 @@ start_server {tags {"scripting"}} {
catch {r evalsha ffd632c7d33e571e9f24556ebed26c3479a87130 0} e
set _ $e
} {NOSCRIPT*}
} ;# is_eval
test {EVAL - Redis integer -> Lua type conversion} {
r set x 0
r eval {
run_script {
local foo = redis.pcall('incr',KEYS[1])
return {type(foo),foo}
} 1 x
@ -71,7 +111,7 @@ start_server {tags {"scripting"}} {
test {EVAL - Redis bulk -> Lua type conversion} {
r set mykey myval
r eval {
run_script {
local foo = redis.pcall('get',KEYS[1])
return {type(foo),foo}
} 1 mykey
@ -82,14 +122,14 @@ start_server {tags {"scripting"}} {
r rpush mylist a
r rpush mylist b
r rpush mylist c
r eval {
run_script {
local foo = redis.pcall('lrange',KEYS[1],0,-1)
return {type(foo),foo[1],foo[2],foo[3],# foo}
} 1 mylist
} {table a b c 3}
test {EVAL - Redis status reply -> Lua type conversion} {
r eval {
run_script {
local foo = redis.pcall('set',KEYS[1],'myval')
return {type(foo),foo['ok']}
} 1 mykey
@ -97,7 +137,7 @@ start_server {tags {"scripting"}} {
test {EVAL - Redis error reply -> Lua type conversion} {
r set mykey myval
r eval {
run_script {
local foo = redis.pcall('incr',KEYS[1])
return {type(foo),foo['err']}
} 1 mykey
@ -105,7 +145,7 @@ start_server {tags {"scripting"}} {
test {EVAL - Redis nil bulk reply -> Lua type conversion} {
r del mykey
r eval {
run_script {
local foo = redis.pcall('get',KEYS[1])
return {type(foo),foo == false}
} 1 mykey
@ -115,13 +155,13 @@ start_server {tags {"scripting"}} {
r set mykey "this is DB 9"
r select 10
r set mykey "this is DB 10"
r eval {return redis.pcall('get',KEYS[1])} 1 mykey
run_script {return redis.pcall('get',KEYS[1])} 1 mykey
} {this is DB 10} {singledb:skip}
test {EVAL - SELECT inside Lua should not affect the caller} {
# here we DB 10 is selected
r set mykey "original value"
r eval {return redis.pcall('select','9')} 0
run_script {return redis.pcall('select','9')} 0
set res [r get mykey]
r select 9
set res
@ -131,7 +171,7 @@ start_server {tags {"scripting"}} {
test {EVAL - Script can't run more than configured time limit} {
r config set lua-time-limit 1
catch {
r eval {
run_script {
local i = 0
while true do i=i+1 end
} 0
@ -142,71 +182,74 @@ start_server {tags {"scripting"}} {
test {EVAL - Scripts can't run blpop command} {
set e {}
catch {r eval {return redis.pcall('blpop','x',0)} 0} e
catch {run_script {return redis.pcall('blpop','x',0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run brpop command} {
set e {}
catch {r eval {return redis.pcall('brpop','empty_list',0)} 0} e
catch {run_script {return redis.pcall('brpop','empty_list',0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run brpoplpush command} {
set e {}
catch {r eval {return redis.pcall('brpoplpush','empty_list1', 'empty_list2',0)} 0} e
catch {run_script {return redis.pcall('brpoplpush','empty_list1', 'empty_list2',0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run blmove command} {
set e {}
catch {r eval {return redis.pcall('blmove','empty_list1', 'empty_list2', 'LEFT', 'LEFT', 0)} 0} e
catch {run_script {return redis.pcall('blmove','empty_list1', 'empty_list2', 'LEFT', 'LEFT', 0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run bzpopmin command} {
set e {}
catch {r eval {return redis.pcall('bzpopmin','empty_zset', 0)} 0} e
catch {run_script {return redis.pcall('bzpopmin','empty_zset', 0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run bzpopmax command} {
set e {}
catch {r eval {return redis.pcall('bzpopmax','empty_zset', 0)} 0} e
catch {run_script {return redis.pcall('bzpopmax','empty_zset', 0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} {
r del s
r xgroup create s g $ MKSTREAM
set res [r eval {return redis.pcall('xread','STREAMS','s','$')} 1 s]
set res [run_script {return redis.pcall('xread','STREAMS','s','$')} 1 s]
assert {$res eq {}}
assert_error "*xread command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s}
set res [r eval {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s]
assert_error "*xread command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s}
set res [run_script {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s]
assert {$res eq {}}
assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s}
assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s}
}
if {$is_eval eq 1} {
# only is_eval Lua can not execute randomkey
test {EVAL - Scripts can't run certain commands} {
set e {}
r debug lua-always-replicate-commands 0
catch {
r eval "redis.pcall('randomkey'); return redis.pcall('set','x','ciao')" 0
run_script "redis.pcall('randomkey'); return redis.pcall('set','x','ciao')" 0
} e
r debug lua-always-replicate-commands 1
set e
} {*not allowed after*} {needs:debug}
} ;# is_eval
test {EVAL - No arguments to redis.call/pcall is considered an error} {
set e {}
catch {r eval {return redis.call()} 0} e
catch {run_script {return redis.call()} 0} e
set e
} {*one argument*}
test {EVAL - redis.call variant raises a Lua error on Redis cmd error (1)} {
set e {}
catch {
r eval "redis.call('nosuchcommand')" 0
run_script "redis.call('nosuchcommand')" 0
} e
set e
} {*Unknown Redis*}
@ -214,7 +257,7 @@ start_server {tags {"scripting"}} {
test {EVAL - redis.call variant raises a Lua error on Redis cmd error (1)} {
set e {}
catch {
r eval "redis.call('get','a','b','c')" 0
run_script "redis.call('get','a','b','c')" 0
} e
set e
} {*number of args*}
@ -223,7 +266,7 @@ start_server {tags {"scripting"}} {
set e {}
r set foo bar
catch {
r eval {redis.call('lpush',KEYS[1],'val')} 1 foo
run_script {redis.call('lpush',KEYS[1],'val')} 1 foo
} e
set e
} {*against a key*}
@ -232,7 +275,7 @@ start_server {tags {"scripting"}} {
# We must return the table as a string because otherwise
# Redis converts floats to ints and we get 0 and 1023 instead
# of 0.0003 and 1023.2 as the parsed output.
r eval {return
run_script {return
table.concat(
cjson.decode(
"[0.0, -5e3, -1, 0.3e-3, 1023.2, 0e10]"), " ")
@ -240,13 +283,13 @@ start_server {tags {"scripting"}} {
} {0 -5000 -1 0.0003 1023.2 0}
test {EVAL - JSON string decoding} {
r eval {local decoded = cjson.decode('{"keya": "a", "keyb": "b"}')
run_script {local decoded = cjson.decode('{"keya": "a", "keyb": "b"}')
return {decoded.keya, decoded.keyb}
} 0
} {a b}
test {EVAL - cmsgpack can pack double?} {
r eval {local encoded = cmsgpack.pack(0.1)
run_script {local encoded = cmsgpack.pack(0.1)
local h = ""
for i = 1, #encoded do
h = h .. string.format("%02x",string.byte(encoded,i))
@ -256,7 +299,7 @@ start_server {tags {"scripting"}} {
} {cb3fb999999999999a}
test {EVAL - cmsgpack can pack negative int64?} {
r eval {local encoded = cmsgpack.pack(-1099511627776)
run_script {local encoded = cmsgpack.pack(-1099511627776)
local h = ""
for i = 1, #encoded do
h = h .. string.format("%02x",string.byte(encoded,i))
@ -266,7 +309,7 @@ start_server {tags {"scripting"}} {
} {d3ffffff0000000000}
test {EVAL - cmsgpack can pack and unpack circular references?} {
r eval {local a = {x=nil,y=5}
run_script {local a = {x=nil,y=5}
local b = {x=a}
a['x'] = b
local encoded = cmsgpack.pack(a)
@ -298,7 +341,7 @@ start_server {tags {"scripting"}} {
} {82a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a17882a17905a17881a178c0 1 1}
test {EVAL - Numerical sanity check from bitop} {
r eval {assert(0x7fffffff == 2147483647, "broken hex literals");
run_script {assert(0x7fffffff == 2147483647, "broken hex literals");
assert(0xffffffff == -1 or 0xffffffff == 2^32-1,
"broken hex literals");
assert(tostring(-1) == "-1", "broken tostring()");
@ -309,7 +352,7 @@ start_server {tags {"scripting"}} {
} {}
test {EVAL - Verify minimal bitop functionality} {
r eval {assert(bit.tobit(1) == 1);
run_script {assert(bit.tobit(1) == 1);
assert(bit.band(1) == 1);
assert(bit.bxor(1,2) == 3);
assert(bit.bor(1,2,4,8,16,32,64,128) == 255)
@ -317,20 +360,22 @@ start_server {tags {"scripting"}} {
} {}
test {EVAL - Able to parse trailing comments} {
r eval {return 'hello' --trailing comment} 0
run_script {return 'hello' --trailing comment} 0
} {hello}
test {EVAL_RO - Successful case} {
r set foo bar
assert_equal bar [r eval_ro {return redis.call('get', KEYS[1]);} 1 foo]
assert_equal bar [run_script_ro {return redis.call('get', KEYS[1]);} 1 foo]
}
test {EVAL_RO - Cannot run write commands} {
r set foo bar
catch {r eval_ro {redis.call('del', KEYS[1]);} 1 foo} e
catch {run_script_ro {redis.call('del', KEYS[1]);} 1 foo} e
set e
} {*Write commands are not allowed from read-only scripts*}
if {$is_eval eq 1} {
# script command is only relevant for is_eval Lua
test {SCRIPTING FLUSH - is able to clear the scripts cache?} {
r set mykey myval
set v [r evalsha fd758d1589d044dd850a6f05d52f2eefd27f033f 1 mykey]
@ -361,6 +406,7 @@ start_server {tags {"scripting"}} {
[r evalsha b534286061d4b9e4026607613b95c06c06015ae8 0]
} {b534286061d4b9e4026607613b95c06c06015ae8 loaded}
# reply oredering is only relevant for is_eval Lua
test "In the context of Lua the output of random commands gets ordered" {
r debug lua-always-replicate-commands 0
r del myset
@ -387,19 +433,20 @@ start_server {tags {"scripting"}} {
r sadd myset a b c
r eval {return redis.call('sort',KEYS[1],'by','_','get','#','get','_:*')} 1 myset
} {a {} b {} c {}} {cluster:skip}
} ;# is_eval
test "redis.sha1hex() implementation" {
list [r eval {return redis.sha1hex('')} 0] \
[r eval {return redis.sha1hex('Pizza & Mandolino')} 0]
list [run_script {return redis.sha1hex('')} 0] \
[run_script {return redis.sha1hex('Pizza & Mandolino')} 0]
} {da39a3ee5e6b4b0d3255bfef95601890afd80709 74822d82031af7493c20eefa13bd07ec4fada82f}
test {Globals protection reading an undeclared global variable} {
catch {r eval {return a} 0} e
catch {run_script {return a} 0} e
set e
} {*ERR*attempted to access * global*}
test {Globals protection setting an undeclared global*} {
catch {r eval {a=10} 0} e
catch {run_script {a=10} 0} e
set e
} {*ERR*attempted to create global*}
@ -417,14 +464,16 @@ start_server {tags {"scripting"}} {
}
r set foo 5
set res {}
lappend res [r eval $decr_if_gt 1 foo 2]
lappend res [r eval $decr_if_gt 1 foo 2]
lappend res [r eval $decr_if_gt 1 foo 2]
lappend res [r eval $decr_if_gt 1 foo 2]
lappend res [r eval $decr_if_gt 1 foo 2]
lappend res [run_script $decr_if_gt 1 foo 2]
lappend res [run_script $decr_if_gt 1 foo 2]
lappend res [run_script $decr_if_gt 1 foo 2]
lappend res [run_script $decr_if_gt 1 foo 2]
lappend res [run_script $decr_if_gt 1 foo 2]
set res
} {4 3 2 2 2}
if {$is_eval eq 1} {
# random handling is only relevant for is_eval Lua
test {Scripting engine resets PRNG at every script execution} {
set rand1 [r eval {return tostring(math.random())} 0]
set rand2 [r eval {return tostring(math.random())} 0]
@ -444,13 +493,14 @@ start_server {tags {"scripting"}} {
assert_equal $rand1 $rand2
assert {$rand2 ne $rand3}
}
} ;# is_eval
test {EVAL does not leak in the Lua stack} {
r set x 0
# Use a non blocking client to speedup the loop.
set rd [redis_deferring_client]
for {set j 0} {$j < 10000} {incr j} {
$rd eval {return redis.call("incr",KEYS[1])} 1 x
run_script_on_connection $rd {return redis.call("incr",KEYS[1])} 1 x
}
for {set j 0} {$j < 10000} {incr j} {
$rd read
@ -464,9 +514,9 @@ start_server {tags {"scripting"}} {
r flushall
r config set appendonly yes
r config set aof-use-rdb-preamble no
r eval {redis.call("set",KEYS[1],"100")} 1 foo
r eval {redis.call("incr",KEYS[1])} 1 foo
r eval {redis.call("incr",KEYS[1])} 1 foo
run_script {redis.call("set",KEYS[1],"100")} 1 foo
run_script {redis.call("incr",KEYS[1])} 1 foo
run_script {redis.call("incr",KEYS[1])} 1 foo
wait_for_condition 50 100 {
[s aof_rewrite_in_progress] == 0
} else {
@ -481,6 +531,8 @@ start_server {tags {"scripting"}} {
set res
} {102} {external:skip}
if {$is_eval eq 1} {
# script propagation is irrelevant on functions
test {EVAL timeout from AOF} {
# generate a long running script that is propagated to the AOF as script
# make sure that the script times out during loading
@ -528,9 +580,11 @@ start_server {tags {"scripting"}} {
assert {[r mget a{t} b{t} c{t} d{t}] eq {1 2 3 4}}
assert {[r spop myset] eq {}}
}
} ;# is_eval
test {Call Redis command with many args from Lua (issue #1764)} {
r eval {
run_script {
local i
local x={}
redis.call('del','mylist')
@ -543,7 +597,7 @@ start_server {tags {"scripting"}} {
} {1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100}
test {Number conversion precision test (issue #1118)} {
r eval {
run_script {
local value = 9007199254740991
redis.call("set","foo",value)
return redis.call("get","foo")
@ -551,19 +605,19 @@ start_server {tags {"scripting"}} {
} {9007199254740991}
test {String containing number precision test (regression of issue #1118)} {
r eval {
run_script {
redis.call("set", "key", "12039611435714932082")
return redis.call("get", "key")
} 1 key
} {12039611435714932082}
test {Verify negative arg count is error instead of crash (issue #1842)} {
catch { r eval { return "hello" } -12 } e
catch { run_script { return "hello" } -12 } e
set e
} {ERR Number of keys can't be negative}
test {Correct handling of reused argv (issue #1939)} {
r eval {
run_script {
for i = 0, 10 do
redis.call('SET', 'a{t}', '1')
redis.call('MGET', 'a{t}', 'b{t}', 'c{t}')
@ -576,7 +630,7 @@ start_server {tags {"scripting"}} {
test {Functions in the Redis namespace are able to report errors} {
catch {
r eval {
run_script {
redis.sha1hex()
} 0
} e
@ -594,22 +648,22 @@ start_server {tags {"scripting"}} {
assert_equal $res $expected_dict
# Test RESP3 client with script in both RESP2 and RESP3 modes
set res [r eval {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash]
set res [run_script {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash]
assert_equal $res $expected_dict
set res [r eval {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash]
set res [run_script {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash]
assert_equal $res $expected_list
# Test RESP2 client with script in both RESP2 and RESP3 modes
r HELLO 2
set res [r eval {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash]
set res [run_script {redis.setresp(3); return redis.call('hgetall', KEYS[1])} 1 hash]
assert_equal $res $expected_list
set res [r eval {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash]
set res [run_script {redis.setresp(2); return redis.call('hgetall', KEYS[1])} 1 hash]
assert_equal $res $expected_list
}
test {Script return recursive object} {
r readraw 1
set res [r eval {local a = {}; local b = {a}; a[1] = b; return a} 0]
set res [run_script {local a = {}; local b = {a}; a[1] = b; return a} 0]
# drain the response
while {true} {
if {$res == "-ERR reached lua stack limit"} {
@ -640,11 +694,11 @@ start_server {tags {"scripting"}} {
test {Timedout read-only scripts can be killed by SCRIPT KILL} {
set rd [redis_deferring_client]
r config set lua-time-limit 10
$rd eval {while true do end} 0
run_script_on_connection $rd {while true do end} 0
after 200
catch {r ping} e
assert_match {BUSY*} $e
r script kill
kill_script
after 200 ; # Give some time to Lua to call the hook again...
assert_equal [r ping] "PONG"
$rd close
@ -653,7 +707,7 @@ start_server {tags {"scripting"}} {
test {Timedout read-only scripts can be killed by SCRIPT KILL even when use pcall} {
set rd [redis_deferring_client]
r config set lua-time-limit 10
$rd eval {local f = function() while 1 do redis.call('ping') end end while 1 do pcall(f) end} 0
run_script_on_connection $rd {local f = function() while 1 do redis.call('ping') end end while 1 do pcall(f) end} 0
wait_for_condition 50 100 {
[catch {r ping} e] == 1
@ -663,7 +717,7 @@ start_server {tags {"scripting"}} {
catch {r ping} e
assert_match {BUSY*} $e
r script kill
kill_script
wait_for_condition 50 100 {
[catch {r ping} e] == 0
@ -685,8 +739,14 @@ start_server {tags {"scripting"}} {
# senging (in a pipeline):
# 1. eval "while 1 do redis.call('ping') end" 0
# 2. ping
set buf "*3\r\n\$4\r\neval\r\n\$33\r\nwhile 1 do redis.call('ping') end\r\n\$1\r\n0\r\n"
append buf "*1\r\n\$4\r\nping\r\n"
if {$is_eval == 1} {
set buf "*3\r\n\$4\r\neval\r\n\$33\r\nwhile 1 do redis.call('ping') end\r\n\$1\r\n0\r\n"
append buf "*1\r\n\$4\r\nping\r\n"
} else {
set buf "*6\r\n\$8\r\nfunction\r\n\$6\r\ncreate\r\n\$3\r\nlua\r\n\$4\r\ntest\r\n\$7\r\nreplace\r\n\$33\r\nwhile 1 do redis.call('ping') end\r\n"
append buf "*3\r\n\$5\r\nfcall\r\n\$4\r\ntest\r\n\$1\r\n0\r\n"
append buf "*1\r\n\$4\r\nping\r\n"
}
$rd write $buf
$rd flush
@ -698,7 +758,7 @@ start_server {tags {"scripting"}} {
catch {r ping} e
assert_match {BUSY*} $e
r script kill
kill_script
wait_for_condition 50 100 {
[catch {r ping} e] == 0
} else {
@ -706,6 +766,11 @@ start_server {tags {"scripting"}} {
}
assert_equal [r ping] "PONG"
if {$is_eval == 0} {
# read the ok reply of function create
assert_match {OK} [$rd read]
}
catch {$rd read} res
assert_match {*killed by user*} $res
@ -717,18 +782,18 @@ start_server {tags {"scripting"}} {
test {Timedout script link is still usable after Lua returns} {
r config set lua-time-limit 10
r eval {for i=1,100000 do redis.call('ping') end return 'ok'} 0
run_script {for i=1,100000 do redis.call('ping') end return 'ok'} 0
r ping
} {PONG}
test {Timedout scripts that modified data can't be killed by SCRIPT KILL} {
set rd [redis_deferring_client]
r config set lua-time-limit 10
$rd eval {redis.call('set',KEYS[1],'y'); while true do end} 1 x
run_script_on_connection $rd {redis.call('set',KEYS[1],'y'); while true do end} 1 x
after 200
catch {r ping} e
assert_match {BUSY*} $e
catch {r script kill} e
catch {kill_script} e
assert_match {UNKILLABLE*} $e
catch {r ping} e
assert_match {BUSY*} $e
@ -761,11 +826,11 @@ foreach cmdrepl {0 1} {
# One with an error, but still executing a command.
# SHA is: 67164fc43fa971f76fd1aaeeaf60c1c178d25876
catch {
r eval {redis.call('incr',KEYS[1]); redis.call('nonexisting')} 1 x
run_script {redis.call('incr',KEYS[1]); redis.call('nonexisting')} 1 x
}
# One command is correct:
# SHA is: 6f5ade10a69975e903c6d07b10ea44c6382381a5
r eval {return redis.call('incr',KEYS[1])} 1 x
run_script {return redis.call('incr',KEYS[1])} 1 x
} {2}
test "Connect a replica to the master instance $rt" {
@ -778,6 +843,7 @@ foreach cmdrepl {0 1} {
}
}
if {$is_eval eq 1} {
test "Now use EVALSHA against the master, with both SHAs $rt" {
# The server should replicate successful and unsuccessful
# commands as EVAL instead of EVALSHA.
@ -794,11 +860,12 @@ foreach cmdrepl {0 1} {
fail "Expected 4 in x, but value is '[r -1 get x]'"
}
}
} ;# is_eval
test "Replication of script multiple pushes to list with BLPOP $rt" {
set rd [redis_deferring_client]
$rd brpop a 0
r eval {
run_script {
redis.call("lpush",KEYS[1],"1");
redis.call("lpush",KEYS[1],"2");
} 1 a
@ -812,6 +879,7 @@ foreach cmdrepl {0 1} {
set res
} {a 1}
if {$is_eval eq 1} {
test "EVALSHA replication when first call is readonly $rt" {
r del x
r eval {if tonumber(ARGV[1]) > 0 then redis.call('incr', KEYS[1]) end} 1 x 0
@ -823,16 +891,17 @@ foreach cmdrepl {0 1} {
fail "Expected 1 in x, but value is '[r -1 get x]'"
}
}
} ;# is_eval
test "Lua scripts using SELECT are replicated correctly $rt" {
r eval {
run_script {
redis.call("set","foo1","bar1")
redis.call("select","10")
redis.call("incr","x")
redis.call("select","11")
redis.call("incr","z")
} 0
r eval {
run_script {
redis.call("set","foo1","bar1")
redis.call("select","10")
redis.call("incr","x")
@ -861,6 +930,8 @@ start_server {tags {"scripting repl external:skip"}} {
}
}
if {$is_eval eq 1} {
# replicate_commands is the default on Redis Function
test "Redis.replicate_commands() must be issued before any write" {
r eval {
redis.call('set','foo','bar');
@ -884,11 +955,11 @@ start_server {tags {"scripting repl external:skip"}} {
r debug lua-always-replicate-commands 1
set e
} {*only after turning on*}
} ;# is_eval
test "Redis.set_repl() don't accept invalid values" {
catch {
r eval {
redis.replicate_commands();
run_script {
redis.set_repl(12345);
} 0
} e
@ -897,8 +968,7 @@ start_server {tags {"scripting repl external:skip"}} {
test "Test selective replication of certain Redis commands from Lua" {
r del a b c d
r eval {
redis.replicate_commands();
run_script {
redis.call('set','a','1');
redis.set_repl(redis.REPL_NONE);
redis.call('set','b','2');
@ -924,24 +994,37 @@ start_server {tags {"scripting repl external:skip"}} {
}
test "PRNG is seeded randomly for command replication" {
set a [
r eval {
redis.replicate_commands();
return math.random()*100000;
} 0
]
set b [
r eval {
redis.replicate_commands();
return math.random()*100000;
} 0
]
if {$is_eval eq 1} {
# on is_eval Lua we need to call redis.replicate_commands() to get real randomization
set a [
run_script {
redis.replicate_commands()
return math.random()*100000;
} 0
]
set b [
run_script {
redis.replicate_commands()
return math.random()*100000;
} 0
]
} else {
set a [
run_script {
return math.random()*100000;
} 0
]
set b [
run_script {
return math.random()*100000;
} 0
]
}
assert {$a ne $b}
}
test "Using side effects is not a problem with command replication" {
r eval {
redis.replicate_commands();
run_script {
redis.call('set','time',redis.call('time')[1])
} 0
@ -956,6 +1039,7 @@ start_server {tags {"scripting repl external:skip"}} {
}
}
if {$is_eval eq 1} {
start_server {tags {"scripting external:skip"}} {
r script debug sync
r eval {return 'hello'} 0
@ -984,12 +1068,13 @@ start_server {tags {"scripting needs:debug external:skip"}} {
r write $cmd
r flush
set ret [r read]
assert_match {*Unknown Redis command called from*} $ret
assert_match {*Unknown Redis command called from script*} $ret
# make sure the server is still ok
reconnect
assert_equal [r ping] {PONG}
}
}
} ;# is_eval
start_server {tags {"scripting resp3 needs:debug"}} {
r debug set-disable-deny-scripts 1
@ -999,7 +1084,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
r readraw 1
test {test resp3 big number protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'bignum')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'bignum')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {$37}
@ -1021,7 +1106,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
}
test {test resp3 map protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'map')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'map')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {*6}
@ -1034,7 +1119,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
}
test {test resp3 set protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'set')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'set')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {*3}
@ -1047,7 +1132,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
}
test {test resp3 double protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'double')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'double')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {$5}
@ -1058,7 +1143,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
}
test {test resp3 null protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'null')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'null')" 0]
if {$client_proto == 2} {
# null is a special case in which a Lua client format does not effect the reply to the client
assert_equal $ret {$-1}
@ -1068,7 +1153,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
} {}
test {test resp3 verbatim protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'verbatim')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'verbatim')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {$25}
@ -1082,7 +1167,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
}
test {test resp3 true protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'true')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'true')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {:1}
@ -1092,7 +1177,7 @@ start_server {tags {"scripting resp3 needs:debug"}} {
}
test {test resp3 false protocol parsing} {
set ret [r eval "redis.setresp($i);return redis.call('debug', 'protocol', 'false')" 0]
set ret [run_script "redis.setresp($i);return redis.call('debug', 'protocol', 'false')" 0]
if {$client_proto == 2 || $i == 2} {
# if either Lua or the clien is RESP2 the reply will be RESP2
assert_equal $ret {:0}
@ -1109,8 +1194,9 @@ start_server {tags {"scripting resp3 needs:debug"}} {
test {test resp3 attribute protocol parsing} {
# attributes are not (yet) expose to the script
# So here we just check the parser handles them and they are ignored.
r eval "redis.setresp(3);return redis.call('debug', 'protocol', 'attrib')" 0
run_script "redis.setresp(3);return redis.call('debug', 'protocol', 'attrib')" 0
} {Some real reply following the attribute}
r debug set-disable-deny-scripts 0
}
} ;# foreach is_eval