diff --git a/redis.conf b/redis.conf index a7bfec83e..e8eff2774 100644 --- a/redis.conf +++ b/redis.conf @@ -1623,8 +1623,9 @@ latency-monitor-threshold 0 # x Expired events (events generated every time a key expires) # e Evicted events (events generated when a key is evicted for maxmemory) # t Stream commands +# d Module key type events # m Key-miss events (Note: It is not included in the 'A' class) -# A Alias for g$lshzxet, so that the "AKE" string means all the events +# A Alias for g$lshzxetd, so that the "AKE" string means all the events # (Except key-miss events which are excluded from 'A' due to their # unique nature). # diff --git a/src/module.c b/src/module.c index 4cc98bc19..05bf3a275 100644 --- a/src/module.c +++ b/src/module.c @@ -5851,6 +5851,7 @@ void moduleReleaseGIL(void) { * - REDISMODULE_NOTIFY_EXPIRED: Expiration events * - REDISMODULE_NOTIFY_EVICTED: Eviction events * - REDISMODULE_NOTIFY_STREAM: Stream events + * - REDISMODULE_NOTIFY_MODULE: Module types events * - REDISMODULE_NOTIFY_KEYMISS: Key-miss events * - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS) * - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules, diff --git a/src/notify.c b/src/notify.c index 5c7634bce..afaddbfca 100644 --- a/src/notify.c +++ b/src/notify.c @@ -56,6 +56,7 @@ int keyspaceEventsStringToFlags(char *classes) { case 'E': flags |= NOTIFY_KEYEVENT; break; case 't': flags |= NOTIFY_STREAM; break; case 'm': flags |= NOTIFY_KEY_MISS; break; + case 'd': flags |= NOTIFY_MODULE; break; default: return -1; } } @@ -82,6 +83,7 @@ sds keyspaceEventsFlagsToString(int flags) { if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); + if (flags & NOTIFY_MODULE) res = sdscatlen(res,"d",1); } if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); diff --git a/src/redismodule.h b/src/redismodule.h index fba810de4..5520ca3cc 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -160,13 +160,14 @@ This flag should not be used directly by the module. #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ #define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */ #define REDISMODULE_NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */ +#define REDISMODULE_NOTIFY_MODULE (1<<13) /* d, module key space notification */ /* Next notification flag, must be updated when adding new flags above! This flag should not be used directly by the module. * Use RedisModule_GetKeyspaceNotificationFlagsAll instead. */ -#define _REDISMODULE_NOTIFY_NEXT (1<<13) +#define _REDISMODULE_NOTIFY_NEXT (1<<14) -#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ +#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_MODULE) /* A */ /* A special pointer that we can use between the core and the module to signal * field deletion, and that is impossible to be a valid pointer. */ diff --git a/src/server.h b/src/server.h index 03746775c..d35eaa425 100644 --- a/src/server.h +++ b/src/server.h @@ -482,7 +482,8 @@ typedef enum { #define NOTIFY_STREAM (1<<10) /* t */ #define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */ #define NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */ -#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ +#define NOTIFY_MODULE (1<<13) /* d, module key space notification */ +#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE) /* A flag */ /* Get the first bind addr or NULL */ #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index be259d738..9305774cd 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -38,6 +38,8 @@ /** strores all the keys on which we got 'loaded' keyspace notification **/ RedisModuleDict *loaded_event_log = NULL; +/** stores all the keys on which we got 'module' keyspace notification **/ +RedisModuleDict *module_event_log = NULL; static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ REDISMODULE_NOT_USED(ctx); @@ -78,6 +80,50 @@ static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const cha return REDISMODULE_OK; } +static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char* keyName = RedisModule_StringPtrLen(key, NULL); + int nokey; + RedisModule_DictGetC(module_event_log, (void*)keyName, strlen(keyName), &nokey); + if(nokey){ + RedisModule_DictSetC(module_event_log, (void*)keyName, strlen(keyName), RedisModule_HoldString(ctx, key)); + } + return REDISMODULE_OK; +} + +static int cmdNotify(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + if(argc != 2){ + return RedisModule_WrongArity(ctx); + } + + RedisModule_NotifyKeyspaceEvent(ctx, REDISMODULE_NOTIFY_MODULE, "notify", argv[1]); + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; +} + +static int cmdIsModuleKeyNotified(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + if(argc != 2){ + return RedisModule_WrongArity(ctx); + } + + const char* key = RedisModule_StringPtrLen(argv[1], NULL); + + int nokey; + RedisModuleString* keyStr = RedisModule_DictGetC(module_event_log, (void*)key, strlen(key), &nokey); + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, !nokey); + if(nokey){ + RedisModule_ReplyWithNull(ctx); + }else{ + RedisModule_ReplyWithString(ctx, keyStr); + } + return REDISMODULE_OK; +} + static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ if(argc != 2){ return RedisModule_WrongArity(ctx); @@ -171,6 +217,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) } loaded_event_log = RedisModule_CreateDict(ctx); + module_event_log = RedisModule_CreateDict(ctx); int keySpaceAll = RedisModule_GetKeyspaceNotificationFlagsAll(); @@ -187,6 +234,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_MODULE, KeySpace_NotificationModule) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"keyspace.is_module_key_notified", cmdIsModuleKeyNotified,"",0,0,0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + if (RedisModule_CreateCommand(ctx,"keyspace.is_key_loaded", cmdIsKeyLoaded,"",0,0,0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } @@ -219,6 +278,16 @@ int RedisModule_OnUnload(RedisModuleCtx *ctx) { RedisModule_FreeString(ctx, val); } RedisModule_FreeDict(ctx, loaded_event_log); + RedisModule_DictIteratorStop(iter); loaded_event_log = NULL; + + iter = RedisModule_DictIteratorStartC(module_event_log, "^", NULL, 0); + while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){ + RedisModule_FreeString(ctx, val); + } + RedisModule_FreeDict(ctx, module_event_log); + RedisModule_DictIteratorStop(iter); + module_event_log = NULL; + return REDISMODULE_OK; } diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 318cdf871..b00aa159a 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -713,3 +713,52 @@ proc chi_square_value {res} { return $x2_value } + +#subscribe to Pub/Sub channels +proc consume_subscribe_messages {client type channels} { + set numsub -1 + set counts {} + + for {set i [llength $channels]} {$i > 0} {incr i -1} { + set msg [$client read] + assert_equal $type [lindex $msg 0] + + # when receiving subscribe messages the channels names + # are ordered. when receiving unsubscribe messages + # they are unordered + set idx [lsearch -exact $channels [lindex $msg 1]] + if {[string match "*unsubscribe" $type]} { + assert {$idx >= 0} + } else { + assert {$idx == 0} + } + set channels [lreplace $channels $idx $idx] + + # aggregate the subscription count to return to the caller + lappend counts [lindex $msg 2] + } + + # we should have received messages for channels + assert {[llength $channels] == 0} + return $counts +} + +proc subscribe {client channels} { + $client subscribe {*}$channels + consume_subscribe_messages $client subscribe $channels +} + +proc unsubscribe {client {channels {}}} { + $client unsubscribe {*}$channels + consume_subscribe_messages $client unsubscribe $channels +} + +proc psubscribe {client channels} { + $client psubscribe {*}$channels + consume_subscribe_messages $client psubscribe $channels +} + +proc punsubscribe {client {channels {}}} { + $client punsubscribe {*}$channels + consume_subscribe_messages $client punsubscribe $channels +} \ No newline at end of file diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl index 293a62e4e..60800bbff 100644 --- a/tests/unit/moduleapi/keyspace_events.tcl +++ b/tests/unit/moduleapi/keyspace_events.tcl @@ -67,5 +67,20 @@ tags "modules" { assert_equal {1} [r get lua] r get x } {3} + + test {Test module key space event} { + r keyspace.notify x + assert_equal {1 x} [r keyspace.is_module_key_notified x] + } + + test "Keyspace notifications: module events test" { + r config set notify-keyspace-events Kd + r del x + set rd1 [redis_deferring_client] + assert_equal {1} [psubscribe $rd1 *] + r keyspace.notify x + assert_equal {pmessage * __keyspace@9__:x notify} [$rd1 read] + $rd1 close + } } } diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 966565ae1..1906805a7 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -1,52 +1,4 @@ start_server {tags {"pubsub network"}} { - proc __consume_subscribe_messages {client type channels} { - set numsub -1 - set counts {} - - for {set i [llength $channels]} {$i > 0} {incr i -1} { - set msg [$client read] - assert_equal $type [lindex $msg 0] - - # when receiving subscribe messages the channels names - # are ordered. when receiving unsubscribe messages - # they are unordered - set idx [lsearch -exact $channels [lindex $msg 1]] - if {[string match "*unsubscribe" $type]} { - assert {$idx >= 0} - } else { - assert {$idx == 0} - } - set channels [lreplace $channels $idx $idx] - - # aggregate the subscription count to return to the caller - lappend counts [lindex $msg 2] - } - - # we should have received messages for channels - assert {[llength $channels] == 0} - return $counts - } - - proc subscribe {client channels} { - $client subscribe {*}$channels - __consume_subscribe_messages $client subscribe $channels - } - - proc unsubscribe {client {channels {}}} { - $client unsubscribe {*}$channels - __consume_subscribe_messages $client unsubscribe $channels - } - - proc psubscribe {client channels} { - $client psubscribe {*}$channels - __consume_subscribe_messages $client psubscribe $channels - } - - proc punsubscribe {client {channels {}}} { - $client punsubscribe {*}$channels - __consume_subscribe_messages $client punsubscribe $channels - } - test "Pub/Sub PING" { set rd1 [redis_deferring_client] subscribe $rd1 somechannel