Dual stack and client-specific IPs in cluster (#736)
New configs: * `cluster-announce-client-ipv4` * `cluster-announce-client-ipv6` New module API function: * `ValkeyModule_GetClusterNodeInfoForClient`, takes a client id and is otherwise just like its non-ForClient cousin. If configured, one of these IP addresses are reported to each client in CLUSTER SLOTS, CLUSTER SHARDS, CLUSTER NODES and redirects, replacing the IP (`custer-announce-ip` or the auto-detected IP) of each node. Which one is reported to the client depends on whether the client is connected over IPv4 or IPv6. Benefits: * This allows clients using IPv4 to get the IPv4 addresses of all cluster nodes and IPv6 clients to get the IPv6 clients. * This allows the IPs visible to clients to be different to the IPs used between the cluster nodes due to NAT'ing. The information is propagated in the cluster bus using new Ping extensions. (Old nodes without this feature ignore unknown Ping extensions.) This adds another dimension to CLUSTER SLOTS reply. It now depends on the client's use of TLS, the IP address family and RESP version. Refactoring: The cached connection type definition is moved from connection.h (it actually has nothing to do with the connection abstraction) to server.h and is changed to a bitmap, with one bit for each of TLS, IPv6 and RESP3. Fixes #337 --------- Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
This commit is contained in:
parent
6a5a11f21c
commit
a323dce890
@ -747,7 +747,7 @@ int verifyClusterNodeId(const char *name, int length) {
|
||||
}
|
||||
|
||||
int isValidAuxChar(int c) {
|
||||
return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL);
|
||||
return isalnum(c) || (strchr("!#$%&()*+.:;<>?@[]^{|}~", c) == NULL);
|
||||
}
|
||||
|
||||
int isValidAuxString(char *s, unsigned int length) {
|
||||
@ -1194,7 +1194,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
|
||||
int port = clusterNodeClientPort(n, shouldReturnTlsInfo());
|
||||
addReplyErrorSds(c,
|
||||
sdscatprintf(sdsempty(), "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
|
||||
hashslot, clusterNodePreferredEndpoint(n), port));
|
||||
hashslot, clusterNodePreferredEndpoint(n, c), port));
|
||||
} else {
|
||||
serverPanic("getNodeByQuery() unknown error.");
|
||||
}
|
||||
@ -1267,7 +1267,7 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
|
||||
char *hostname = clusterNodeHostname(node);
|
||||
addReplyArrayLen(c, 4);
|
||||
if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
|
||||
addReplyBulkCString(c, clusterNodeIp(node));
|
||||
addReplyBulkCString(c, clusterNodeIp(node, c));
|
||||
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
|
||||
if (hostname != NULL && hostname[0] != '\0') {
|
||||
addReplyBulkCString(c, hostname);
|
||||
@ -1300,7 +1300,7 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
|
||||
|
||||
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
|
||||
addReplyBulkCString(c, "ip");
|
||||
addReplyBulkCString(c, clusterNodeIp(node));
|
||||
addReplyBulkCString(c, clusterNodeIp(node, c));
|
||||
length--;
|
||||
}
|
||||
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME && hostname != NULL &&
|
||||
@ -1353,12 +1353,10 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
|
||||
}
|
||||
|
||||
void clearCachedClusterSlotsResponse(void) {
|
||||
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
|
||||
for (int resp = 0; resp <= 3; resp++) {
|
||||
if (server.cached_cluster_slot_info[conn_type][resp]) {
|
||||
sdsfree(server.cached_cluster_slot_info[conn_type][resp]);
|
||||
server.cached_cluster_slot_info[conn_type][resp] = NULL;
|
||||
}
|
||||
for (int conn_type = 0; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
|
||||
if (server.cached_cluster_slot_info[conn_type]) {
|
||||
sdsfree(server.cached_cluster_slot_info[conn_type]);
|
||||
server.cached_cluster_slot_info[conn_type] = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1415,14 +1413,17 @@ void clusterCommandSlots(client *c) {
|
||||
* 3) node ID
|
||||
* ... continued until done
|
||||
*/
|
||||
connTypeForCaching conn_type = shouldReturnTlsInfo();
|
||||
int conn_type = 0;
|
||||
if (connIsTLS(c->conn)) conn_type |= CACHE_CONN_TYPE_TLS;
|
||||
if (isClientConnIpV6(c)) conn_type |= CACHE_CONN_TYPE_IPv6;
|
||||
if (c->resp == 3) conn_type |= CACHE_CONN_TYPE_RESP3;
|
||||
|
||||
if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();
|
||||
|
||||
sds cached_reply = server.cached_cluster_slot_info[conn_type][c->resp];
|
||||
sds cached_reply = server.cached_cluster_slot_info[conn_type];
|
||||
if (!cached_reply) {
|
||||
cached_reply = generateClusterSlotResponse(c->resp);
|
||||
server.cached_cluster_slot_info[conn_type][c->resp] = cached_reply;
|
||||
server.cached_cluster_slot_info[conn_type] = cached_reply;
|
||||
} else {
|
||||
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply, c->resp) == 1);
|
||||
}
|
||||
|
@ -48,6 +48,8 @@ int clusterSendModuleMessageToTarget(const char *target,
|
||||
|
||||
void clusterUpdateMyselfFlags(void);
|
||||
void clusterUpdateMyselfIp(void);
|
||||
void clusterUpdateMyselfClientIpV4(void);
|
||||
void clusterUpdateMyselfClientIpV6(void);
|
||||
void clusterUpdateMyselfHostname(void);
|
||||
void clusterUpdateMyselfAnnouncedPorts(void);
|
||||
void clusterUpdateMyselfHumanNodename(void);
|
||||
@ -85,7 +87,7 @@ int handleDebugClusterCommand(client *c);
|
||||
int clusterNodePending(clusterNode *node);
|
||||
int clusterNodeIsPrimary(clusterNode *n);
|
||||
char **getClusterNodesList(size_t *numnodes);
|
||||
char *clusterNodeIp(clusterNode *node);
|
||||
char *clusterNodeIp(clusterNode *node, client *c);
|
||||
int clusterNodeIsReplica(clusterNode *node);
|
||||
clusterNode *clusterNodeGetPrimary(clusterNode *node);
|
||||
char *clusterNodeGetName(clusterNode *node);
|
||||
@ -100,7 +102,7 @@ clusterNode *getImportingSlotSource(int slot);
|
||||
clusterNode *getNodeBySlot(int slot);
|
||||
int clusterNodeClientPort(clusterNode *n, int use_tls);
|
||||
char *clusterNodeHostname(clusterNode *node);
|
||||
const char *clusterNodePreferredEndpoint(clusterNode *n);
|
||||
const char *clusterNodePreferredEndpoint(clusterNode *n, client *c);
|
||||
long long clusterNodeReplOffset(clusterNode *node);
|
||||
clusterNode *clusterLookupNode(const char *name, int length);
|
||||
int detectAndUpdateCachedNodeHealth(void);
|
||||
|
@ -98,16 +98,22 @@ unsigned int delKeysInSlot(unsigned int hashslot);
|
||||
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
|
||||
list *clusterLookupNodeListByShardId(const char *shard_id);
|
||||
void clusterRemoveNodeFromShard(clusterNode *node);
|
||||
int auxShardIdSetter(clusterNode *n, void *value, int length);
|
||||
int auxShardIdSetter(clusterNode *n, void *value, size_t length);
|
||||
sds auxShardIdGetter(clusterNode *n, sds s);
|
||||
int auxShardIdPresent(clusterNode *n);
|
||||
int auxHumanNodenameSetter(clusterNode *n, void *value, int length);
|
||||
int auxHumanNodenameSetter(clusterNode *n, void *value, size_t length);
|
||||
sds auxHumanNodenameGetter(clusterNode *n, sds s);
|
||||
int auxHumanNodenamePresent(clusterNode *n);
|
||||
int auxTcpPortSetter(clusterNode *n, void *value, int length);
|
||||
int auxAnnounceClientIpV4Setter(clusterNode *n, void *value, size_t length);
|
||||
sds auxAnnounceClientIpV4Getter(clusterNode *n, sds s);
|
||||
int auxAnnounceClientIpV4Present(clusterNode *n);
|
||||
int auxAnnounceClientIpV6Setter(clusterNode *n, void *value, size_t length);
|
||||
sds auxAnnounceClientIpV6Getter(clusterNode *n, sds s);
|
||||
int auxAnnounceClientIpV6Present(clusterNode *n);
|
||||
int auxTcpPortSetter(clusterNode *n, void *value, size_t length);
|
||||
sds auxTcpPortGetter(clusterNode *n, sds s);
|
||||
int auxTcpPortPresent(clusterNode *n);
|
||||
int auxTlsPortSetter(clusterNode *n, void *value, int length);
|
||||
int auxTlsPortSetter(clusterNode *n, void *value, size_t length);
|
||||
sds auxTlsPortGetter(clusterNode *n, sds s);
|
||||
int auxTlsPortPresent(clusterNode *n);
|
||||
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
|
||||
@ -190,7 +196,7 @@ dictType clusterSdsToListType = {
|
||||
|
||||
/* Aux field setter function prototype
|
||||
* return C_OK when the update is successful; C_ERR otherwise */
|
||||
typedef int(aux_value_setter)(clusterNode *n, void *value, int length);
|
||||
typedef int(aux_value_setter)(clusterNode *n, void *value, size_t length);
|
||||
/* Aux field getter function prototype
|
||||
* return an sds that is a concatenation of the input sds string and
|
||||
* the aux value */
|
||||
@ -211,7 +217,9 @@ typedef enum {
|
||||
af_human_nodename,
|
||||
af_tcp_port,
|
||||
af_tls_port,
|
||||
af_count,
|
||||
af_announce_client_ipv4,
|
||||
af_announce_client_ipv6,
|
||||
af_count, /* must be the last field */
|
||||
} auxFieldIndex;
|
||||
|
||||
/* Note that
|
||||
@ -223,9 +231,11 @@ auxFieldHandler auxFieldHandlers[] = {
|
||||
{"nodename", auxHumanNodenameSetter, auxHumanNodenameGetter, auxHumanNodenamePresent},
|
||||
{"tcp-port", auxTcpPortSetter, auxTcpPortGetter, auxTcpPortPresent},
|
||||
{"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent},
|
||||
{"client-ipv4", auxAnnounceClientIpV4Setter, auxAnnounceClientIpV4Getter, auxAnnounceClientIpV4Present},
|
||||
{"client-ipv6", auxAnnounceClientIpV6Setter, auxAnnounceClientIpV6Getter, auxAnnounceClientIpV6Present},
|
||||
};
|
||||
|
||||
int auxShardIdSetter(clusterNode *n, void *value, int length) {
|
||||
int auxShardIdSetter(clusterNode *n, void *value, size_t length) {
|
||||
if (verifyClusterNodeId(value, length) == C_ERR) {
|
||||
return C_ERR;
|
||||
}
|
||||
@ -249,19 +259,12 @@ int auxShardIdPresent(clusterNode *n) {
|
||||
return strlen(n->shard_id);
|
||||
}
|
||||
|
||||
int auxHumanNodenameSetter(clusterNode *n, void *value, int length) {
|
||||
if (n && !strncmp(value, n->human_nodename, length)) {
|
||||
return C_OK;
|
||||
} else if (!n && (length == 0)) {
|
||||
int auxHumanNodenameSetter(clusterNode *n, void *value, size_t length) {
|
||||
if (sdslen(n->human_nodename) == length && !strncmp(value, n->human_nodename, length)) {
|
||||
return C_OK;
|
||||
}
|
||||
if (n) {
|
||||
n->human_nodename = sdscpylen(n->human_nodename, value, length);
|
||||
} else if (sdslen(n->human_nodename) != 0) {
|
||||
sdsclear(n->human_nodename);
|
||||
} else {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
n->human_nodename = sdscpylen(n->human_nodename, value, length);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -273,7 +276,59 @@ int auxHumanNodenamePresent(clusterNode *n) {
|
||||
return sdslen(n->human_nodename);
|
||||
}
|
||||
|
||||
int auxTcpPortSetter(clusterNode *n, void *value, int length) {
|
||||
int auxAnnounceClientIpV4Setter(clusterNode *n, void *value, size_t length) {
|
||||
if (sdslen(n->announce_client_ipv4) == length && !strncmp(value, n->announce_client_ipv4, length)) {
|
||||
/* Unchanged value */
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
if (length != 0) {
|
||||
/* Validate IPv4 address */
|
||||
struct sockaddr_in sa;
|
||||
if (inet_pton(AF_INET, (const char *)value, &(sa.sin_addr)) == 0) {
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
|
||||
n->announce_client_ipv4 = sdscpylen(n->announce_client_ipv4, value, length);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
sds auxAnnounceClientIpV4Getter(clusterNode *n, sds s) {
|
||||
return sdscatprintf(s, "%s", n->announce_client_ipv4);
|
||||
}
|
||||
|
||||
int auxAnnounceClientIpV4Present(clusterNode *n) {
|
||||
return sdslen(n->announce_client_ipv4) != 0;
|
||||
}
|
||||
|
||||
int auxAnnounceClientIpV6Setter(clusterNode *n, void *value, size_t length) {
|
||||
if (sdslen(n->announce_client_ipv6) == length && !strncmp(value, n->announce_client_ipv6, length)) {
|
||||
/* Unchanged value */
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
if (length != 0) {
|
||||
/* Validate IPv6 address */
|
||||
struct sockaddr_in6 sa;
|
||||
if (inet_pton(AF_INET6, (const char *)value, &(sa.sin6_addr)) == 0) {
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
|
||||
n->announce_client_ipv6 = sdscpylen(n->announce_client_ipv6, value, length);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
sds auxAnnounceClientIpV6Getter(clusterNode *n, sds s) {
|
||||
return sdscatprintf(s, "%s", n->announce_client_ipv6);
|
||||
}
|
||||
|
||||
int auxAnnounceClientIpV6Present(clusterNode *n) {
|
||||
return sdslen(n->announce_client_ipv6) != 0;
|
||||
}
|
||||
|
||||
int auxTcpPortSetter(clusterNode *n, void *value, size_t length) {
|
||||
if (length > 5 || length < 1) {
|
||||
return C_ERR;
|
||||
}
|
||||
@ -292,7 +347,7 @@ int auxTcpPortPresent(clusterNode *n) {
|
||||
return n->tcp_port >= 0 && n->tcp_port < 65536;
|
||||
}
|
||||
|
||||
int auxTlsPortSetter(clusterNode *n, void *value, int length) {
|
||||
int auxTlsPortSetter(clusterNode *n, void *value, size_t length) {
|
||||
if (length > 5 || length < 1) {
|
||||
return C_ERR;
|
||||
}
|
||||
@ -885,38 +940,37 @@ void clusterUpdateMyselfIp(void) {
|
||||
}
|
||||
}
|
||||
|
||||
static void updateSdsExtensionField(char **field, const char *value) {
|
||||
if (value != NULL && !strcmp(value, *field)) {
|
||||
return;
|
||||
} else if (value == NULL && sdslen(*field) == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != NULL) {
|
||||
*field = sdscpy(*field, value);
|
||||
} else {
|
||||
sdsclear(*field);
|
||||
}
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||
}
|
||||
|
||||
/* Update the hostname for the specified node with the provided C string. */
|
||||
static void updateAnnouncedHostname(clusterNode *node, char *new) {
|
||||
/* Previous and new hostname are the same, no need to update. */
|
||||
if (new && !strcmp(new, node->hostname)) {
|
||||
return;
|
||||
} else if (!new && (sdslen(node->hostname) == 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (new) {
|
||||
node->hostname = sdscpy(node->hostname, new);
|
||||
} else if (sdslen(node->hostname) != 0) {
|
||||
sdsclear(node->hostname);
|
||||
}
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||
static void updateAnnouncedHostname(clusterNode *node, char *value) {
|
||||
updateSdsExtensionField(&node->hostname, value);
|
||||
}
|
||||
|
||||
static void updateAnnouncedHumanNodename(clusterNode *node, char *new) {
|
||||
if (new && !strcmp(new, node->human_nodename)) {
|
||||
return;
|
||||
} else if (!new && (sdslen(node->human_nodename) == 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (new) {
|
||||
node->human_nodename = sdscpy(node->human_nodename, new);
|
||||
} else if (sdslen(node->human_nodename) != 0) {
|
||||
sdsclear(node->human_nodename);
|
||||
}
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||
static void updateAnnouncedHumanNodename(clusterNode *node, char *value) {
|
||||
updateSdsExtensionField(&node->human_nodename, value);
|
||||
}
|
||||
|
||||
static void updateAnnouncedClientIpV4(clusterNode *node, char *value) {
|
||||
updateSdsExtensionField(&node->announce_client_ipv4, value);
|
||||
}
|
||||
|
||||
static void updateAnnouncedClientIpV6(clusterNode *node, char *value) {
|
||||
updateSdsExtensionField(&node->announce_client_ipv6, value);
|
||||
}
|
||||
|
||||
static void updateShardId(clusterNode *node, const char *shard_id) {
|
||||
if (shard_id && memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) {
|
||||
@ -956,6 +1010,16 @@ void clusterUpdateMyselfHumanNodename(void) {
|
||||
updateAnnouncedHumanNodename(myself, server.cluster_announce_human_nodename);
|
||||
}
|
||||
|
||||
void clusterUpdateMyselfClientIpV4(void) {
|
||||
if (!myself) return;
|
||||
updateAnnouncedClientIpV4(myself, server.cluster_announce_client_ipv4);
|
||||
}
|
||||
|
||||
void clusterUpdateMyselfClientIpV6(void) {
|
||||
if (!myself) return;
|
||||
updateAnnouncedClientIpV6(myself, server.cluster_announce_client_ipv6);
|
||||
}
|
||||
|
||||
void clusterInit(void) {
|
||||
int saveconf = 0;
|
||||
|
||||
@ -1033,14 +1097,14 @@ void clusterInit(void) {
|
||||
|
||||
server.cluster->mf_end = 0;
|
||||
server.cluster->mf_replica = NULL;
|
||||
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
|
||||
for (int resp = 0; resp <= 3; resp++) {
|
||||
server.cached_cluster_slot_info[conn_type][resp] = NULL;
|
||||
}
|
||||
for (int conn_type = 0; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
|
||||
server.cached_cluster_slot_info[conn_type] = NULL;
|
||||
}
|
||||
resetManualFailover();
|
||||
clusterUpdateMyselfFlags();
|
||||
clusterUpdateMyselfIp();
|
||||
clusterUpdateMyselfClientIpV4();
|
||||
clusterUpdateMyselfClientIpV6();
|
||||
clusterUpdateMyselfHostname();
|
||||
clusterUpdateMyselfHumanNodename();
|
||||
}
|
||||
@ -1344,6 +1408,8 @@ clusterNode *createClusterNode(char *nodename, int flags) {
|
||||
node->link = NULL;
|
||||
node->inbound_link = NULL;
|
||||
memset(node->ip, 0, sizeof(node->ip));
|
||||
node->announce_client_ipv4 = sdsempty();
|
||||
node->announce_client_ipv6 = sdsempty();
|
||||
node->hostname = sdsempty();
|
||||
node->human_nodename = sdsempty();
|
||||
node->tcp_port = 0;
|
||||
@ -1515,6 +1581,8 @@ void freeClusterNode(clusterNode *n) {
|
||||
sdsfree(nodename);
|
||||
sdsfree(n->hostname);
|
||||
sdsfree(n->human_nodename);
|
||||
sdsfree(n->announce_client_ipv4);
|
||||
sdsfree(n->announce_client_ipv6);
|
||||
|
||||
/* Release links and associated data structures. */
|
||||
if (n->link) freeClusterLink(n->link);
|
||||
@ -2555,45 +2623,49 @@ static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) {
|
||||
}
|
||||
|
||||
/* All PING extensions must be 8-byte aligned */
|
||||
uint32_t getAlignedPingExtSize(uint32_t dataSize) {
|
||||
static uint32_t getAlignedPingExtSize(uint32_t dataSize) {
|
||||
return sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(dataSize);
|
||||
}
|
||||
|
||||
uint32_t getHostnamePingExtSize(void) {
|
||||
if (sdslen(myself->hostname) == 0) {
|
||||
return 0;
|
||||
}
|
||||
return getAlignedPingExtSize(sdslen(myself->hostname) + 1);
|
||||
}
|
||||
|
||||
uint32_t getHumanNodenamePingExtSize(void) {
|
||||
if (sdslen(myself->human_nodename) == 0) {
|
||||
return 0;
|
||||
}
|
||||
return getAlignedPingExtSize(sdslen(myself->human_nodename) + 1);
|
||||
}
|
||||
|
||||
uint32_t getShardIdPingExtSize(void) {
|
||||
static uint32_t getShardIdPingExtSize(void) {
|
||||
return getAlignedPingExtSize(sizeof(clusterMsgPingExtShardId));
|
||||
}
|
||||
|
||||
uint32_t getForgottenNodeExtSize(void) {
|
||||
static uint32_t getForgottenNodeExtSize(void) {
|
||||
return getAlignedPingExtSize(sizeof(clusterMsgPingExtForgottenNode));
|
||||
}
|
||||
|
||||
void *preparePingExt(clusterMsgPingExt *ext, uint16_t type, uint32_t length) {
|
||||
static void *preparePingExt(clusterMsgPingExt *ext, uint16_t type, uint32_t length) {
|
||||
ext->type = htons(type);
|
||||
ext->length = htonl(length);
|
||||
return &ext->ext[0];
|
||||
}
|
||||
|
||||
/* If value is nonempty and cursor_ptr points to a non-NULL cursor, writes a
|
||||
* ping extension at the cursor, advances the cursor, increments totlen and
|
||||
* returns 1. If value is nonempty and cursor_ptr points to NULL, just computes
|
||||
* the size, increments totlen and returns 1. If value is empty, returns 0. */
|
||||
static uint32_t
|
||||
writeSdsPingExtIfNonempty(uint32_t *totlen_ptr, clusterMsgPingExt **cursor_ptr, clusterMsgPingtypes type, sds value) {
|
||||
size_t len = sdslen(value);
|
||||
if (len == 0) return 0;
|
||||
size_t size = getAlignedPingExtSize(len + 1);
|
||||
if (*cursor_ptr != NULL) {
|
||||
void *ext = preparePingExt(*cursor_ptr, type, size);
|
||||
memcpy(ext, value, len);
|
||||
*cursor_ptr = getNextPingExt(*cursor_ptr);
|
||||
}
|
||||
*totlen_ptr += size;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* 1. If a NULL hdr is provided, compute the extension size;
|
||||
* 2. If a non-NULL hdr is provided, write the hostname ping
|
||||
* extension at the start of the cursor. This function
|
||||
* 2. If a non-NULL hdr is provided, write the ping
|
||||
* extensions at the start of the cursor. This function
|
||||
* will update the cursor to point to the end of the
|
||||
* written extension and will return the amount of bytes
|
||||
* written. */
|
||||
uint32_t writePingExt(clusterMsg *hdr, int gossipcount) {
|
||||
static uint32_t writePingExtensions(clusterMsg *hdr, int gossipcount) {
|
||||
uint16_t extensions = 0;
|
||||
uint32_t totlen = 0;
|
||||
clusterMsgPingExt *cursor = NULL;
|
||||
@ -2602,36 +2674,14 @@ uint32_t writePingExt(clusterMsg *hdr, int gossipcount) {
|
||||
cursor = getInitialPingExt(hdr, gossipcount);
|
||||
}
|
||||
|
||||
/* hostname is optional */
|
||||
if (sdslen(myself->hostname) != 0) {
|
||||
if (cursor != NULL) {
|
||||
/* Populate hostname */
|
||||
clusterMsgPingExtHostname *ext =
|
||||
preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, getHostnamePingExtSize());
|
||||
memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname));
|
||||
|
||||
/* Move the write cursor */
|
||||
cursor = getNextPingExt(cursor);
|
||||
}
|
||||
|
||||
totlen += getHostnamePingExtSize();
|
||||
extensions++;
|
||||
}
|
||||
|
||||
if (sdslen(myself->human_nodename) != 0) {
|
||||
if (cursor != NULL) {
|
||||
/* Populate human_nodename */
|
||||
clusterMsgPingExtHumanNodename *ext =
|
||||
preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME, getHumanNodenamePingExtSize());
|
||||
memcpy(ext->human_nodename, myself->human_nodename, sdslen(myself->human_nodename));
|
||||
|
||||
/* Move the write cursor */
|
||||
cursor = getNextPingExt(cursor);
|
||||
}
|
||||
|
||||
totlen += getHumanNodenamePingExtSize();
|
||||
extensions++;
|
||||
}
|
||||
/* Write simple optional SDS ping extensions. */
|
||||
extensions += writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, myself->hostname);
|
||||
extensions +=
|
||||
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME, myself->human_nodename);
|
||||
extensions +=
|
||||
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_IPV4, myself->announce_client_ipv4);
|
||||
extensions +=
|
||||
writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_CLIENT_IPV6, myself->announce_client_ipv6);
|
||||
|
||||
/* Gossip forgotten nodes */
|
||||
if (dictSize(server.cluster->nodes_black_list) > 0) {
|
||||
@ -2681,6 +2731,8 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
|
||||
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
|
||||
char *ext_hostname = NULL;
|
||||
char *ext_humannodename = NULL;
|
||||
char *ext_clientipv4 = NULL;
|
||||
char *ext_clientipv6 = NULL;
|
||||
char *ext_shardid = NULL;
|
||||
uint16_t extensions = ntohs(hdr->extensions);
|
||||
/* Loop through all the extensions and process them */
|
||||
@ -2694,6 +2746,14 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
|
||||
clusterMsgPingExtHumanNodename *humannodename_ext =
|
||||
(clusterMsgPingExtHumanNodename *)&(ext->ext[0].human_nodename);
|
||||
ext_humannodename = humannodename_ext->human_nodename;
|
||||
} else if (type == CLUSTERMSG_EXT_TYPE_CLIENT_IPV4) {
|
||||
clusterMsgPingExtClientIpV4 *clientipv4_ext =
|
||||
(clusterMsgPingExtClientIpV4 *)&(ext->ext[0].announce_client_ipv4);
|
||||
ext_clientipv4 = clientipv4_ext->announce_client_ipv4;
|
||||
} else if (type == CLUSTERMSG_EXT_TYPE_CLIENT_IPV6) {
|
||||
clusterMsgPingExtClientIpV6 *clientipv6_ext =
|
||||
(clusterMsgPingExtClientIpV6 *)&(ext->ext[0].announce_client_ipv6);
|
||||
ext_clientipv6 = clientipv6_ext->announce_client_ipv6;
|
||||
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
|
||||
clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
|
||||
clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
|
||||
@ -2722,6 +2782,8 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
|
||||
* set it now. */
|
||||
updateAnnouncedHostname(sender, ext_hostname);
|
||||
updateAnnouncedHumanNodename(sender, ext_humannodename);
|
||||
updateAnnouncedClientIpV4(sender, ext_clientipv4);
|
||||
updateAnnouncedClientIpV6(sender, ext_clientipv6);
|
||||
/* If the node did not send us a shard-id extension, it means the sender
|
||||
* does not support it (old version), node->shard_id is randomly generated.
|
||||
* A cluster-wide consensus for the node's shard_id is not necessary.
|
||||
@ -3681,7 +3743,7 @@ void clusterSendPing(clusterLink *link, int type) {
|
||||
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
|
||||
estlen += (sizeof(clusterMsgDataGossip) * (wanted + pfail_wanted));
|
||||
if (link->node && nodeSupportsExtensions(link->node)) {
|
||||
estlen += writePingExt(NULL, 0);
|
||||
estlen += writePingExtensions(NULL, 0);
|
||||
}
|
||||
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
|
||||
* sizeof(clusterMsg) or more. */
|
||||
@ -3752,7 +3814,7 @@ void clusterSendPing(clusterLink *link, int type) {
|
||||
uint32_t totlen = 0;
|
||||
|
||||
if (link->node && nodeSupportsExtensions(link->node)) {
|
||||
totlen += writePingExt(hdr, gossipcount);
|
||||
totlen += writePingExtensions(hdr, gossipcount);
|
||||
} else {
|
||||
serverLog(LL_DEBUG, "Unable to send extensions data, however setting ext data flag to true");
|
||||
hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
|
||||
@ -5248,15 +5310,19 @@ sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_cou
|
||||
/* Generate a csv-alike representation of the specified cluster node.
|
||||
* See clusterGenNodesDescription() top comment for more information.
|
||||
*
|
||||
* If a client is provided, we're creating a reply to the CLUSTER NODES command.
|
||||
* If client is NULL, we are creating the content of nodes.conf.
|
||||
*
|
||||
* The function returns the string representation as an SDS string. */
|
||||
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
|
||||
int j, start;
|
||||
sds ci;
|
||||
int port = clusterNodeClientPort(node, tls_primary);
|
||||
char *ip = clusterNodeIp(node, c);
|
||||
|
||||
/* Node coordinates */
|
||||
ci = sdscatlen(sdsempty(), node->name, CLUSTER_NAMELEN);
|
||||
ci = sdscatfmt(ci, " %s:%i@%i", node->ip, port, node->cport);
|
||||
ci = sdscatfmt(ci, " %s:%i@%i", ip, port, node->cport);
|
||||
if (sdslen(node->hostname) != 0) {
|
||||
ci = sdscatfmt(ci, ",%s", node->hostname);
|
||||
}
|
||||
@ -5571,11 +5637,11 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "ip");
|
||||
addReplyBulkCString(c, node->ip);
|
||||
addReplyBulkCString(c, clusterNodeIp(node, c));
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "endpoint");
|
||||
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
|
||||
addReplyBulkCString(c, clusterNodePreferredEndpoint(node, c));
|
||||
reply_count++;
|
||||
|
||||
if (sdslen(node->hostname) != 0) {
|
||||
@ -5844,7 +5910,16 @@ int clusterNodePending(clusterNode *node) {
|
||||
return node->flags & (CLUSTER_NODE_NOADDR | CLUSTER_NODE_HANDSHAKE);
|
||||
}
|
||||
|
||||
char *clusterNodeIp(clusterNode *node) {
|
||||
/* Returns the IP of the node as seen by the given client, or by the cluster node if c is NULL. */
|
||||
char *clusterNodeIp(clusterNode *node, client *c) {
|
||||
if (c == NULL) {
|
||||
return node->ip;
|
||||
}
|
||||
if (isClientConnIpV6(c)) {
|
||||
if (sdslen(node->announce_client_ipv6) != 0) return node->announce_client_ipv6;
|
||||
} else {
|
||||
if (sdslen(node->announce_client_ipv4) != 0) return node->announce_client_ipv4;
|
||||
}
|
||||
return node->ip;
|
||||
}
|
||||
|
||||
@ -6509,10 +6584,10 @@ long long clusterNodeReplOffset(clusterNode *node) {
|
||||
return node->repl_offset;
|
||||
}
|
||||
|
||||
const char *clusterNodePreferredEndpoint(clusterNode *n) {
|
||||
const char *clusterNodePreferredEndpoint(clusterNode *n, client *c) {
|
||||
char *hostname = clusterNodeHostname(n);
|
||||
switch (server.cluster_preferred_endpoint_type) {
|
||||
case CLUSTER_ENDPOINT_TYPE_IP: return clusterNodeIp(n);
|
||||
case CLUSTER_ENDPOINT_TYPE_IP: return clusterNodeIp(n, c);
|
||||
case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return (hostname != NULL && hostname[0] != '\0') ? hostname : "?";
|
||||
case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return "";
|
||||
}
|
||||
|
@ -138,6 +138,8 @@ typedef enum {
|
||||
CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME,
|
||||
CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE,
|
||||
CLUSTERMSG_EXT_TYPE_SHARDID,
|
||||
CLUSTERMSG_EXT_TYPE_CLIENT_IPV4,
|
||||
CLUSTERMSG_EXT_TYPE_CLIENT_IPV6,
|
||||
} clusterMsgPingtypes;
|
||||
|
||||
/* Helper function for making sure extensions are eight byte aligned. */
|
||||
@ -162,6 +164,14 @@ typedef struct {
|
||||
char shard_id[CLUSTER_NAMELEN]; /* The shard_id, 40 bytes fixed. */
|
||||
} clusterMsgPingExtShardId;
|
||||
|
||||
typedef struct {
|
||||
char announce_client_ipv4[1]; /* Announced client IPv4, ends with \0. */
|
||||
} clusterMsgPingExtClientIpV4;
|
||||
|
||||
typedef struct {
|
||||
char announce_client_ipv6[1]; /* Announced client IPv6, ends with \0. */
|
||||
} clusterMsgPingExtClientIpV6;
|
||||
|
||||
typedef struct {
|
||||
uint32_t length; /* Total length of this extension message (including this header) */
|
||||
uint16_t type; /* Type of this extension message (see clusterMsgPingtypes) */
|
||||
@ -171,6 +181,8 @@ typedef struct {
|
||||
clusterMsgPingExtHumanNodename human_nodename;
|
||||
clusterMsgPingExtForgottenNode forgotten_node;
|
||||
clusterMsgPingExtShardId shard_id;
|
||||
clusterMsgPingExtClientIpV4 announce_client_ipv4;
|
||||
clusterMsgPingExtClientIpV6 announce_client_ipv6;
|
||||
} ext[]; /* Actual extension information, formatted so that the data is 8
|
||||
* byte aligned, regardless of its content. */
|
||||
} clusterMsgPingExt;
|
||||
@ -303,6 +315,8 @@ struct _clusterNode {
|
||||
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
|
||||
long long repl_offset; /* Last known repl offset for this node. */
|
||||
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
|
||||
sds announce_client_ipv4; /* IPv4 for clients only. */
|
||||
sds announce_client_ipv6; /* IPv6 for clients only. */
|
||||
sds hostname; /* The known hostname for this node */
|
||||
sds human_nodename; /* The known human readable nodename for this node */
|
||||
int tcp_port; /* Latest known clients TCP port. */
|
||||
|
33
src/config.c
33
src/config.c
@ -35,6 +35,7 @@
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <glob.h>
|
||||
#include <string.h>
|
||||
#include <locale.h>
|
||||
@ -2382,6 +2383,24 @@ static int isValidAnnouncedHostname(char *val, const char **err) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int isValidIpV4(char *val, const char **err) {
|
||||
struct sockaddr_in sa;
|
||||
if (val[0] != '\0' && inet_pton(AF_INET, val, &(sa.sin_addr)) == 0) {
|
||||
*err = "Invalid IPv4 address";
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int isValidIpV6(char *val, const char **err) {
|
||||
struct sockaddr_in6 sa;
|
||||
if (val[0] != '\0' && inet_pton(AF_INET6, val, &(sa.sin6_addr)) == 0) {
|
||||
*err = "Invalid IPv6 address";
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Validate specified string is a valid proc-title-template */
|
||||
static int isValidProcTitleTemplate(char *val, const char **err) {
|
||||
if (!validateProcTitleTemplate(val)) {
|
||||
@ -2623,6 +2642,18 @@ static int updateClusterIp(const char **err) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
int updateClusterClientIpV4(const char **err) {
|
||||
UNUSED(err);
|
||||
clusterUpdateMyselfClientIpV4();
|
||||
return 1;
|
||||
}
|
||||
|
||||
int updateClusterClientIpV6(const char **err) {
|
||||
UNUSED(err);
|
||||
clusterUpdateMyselfClientIpV6();
|
||||
return 1;
|
||||
}
|
||||
|
||||
int updateClusterHostname(const char **err) {
|
||||
UNUSED(err);
|
||||
clusterUpdateMyselfHostname();
|
||||
@ -3081,6 +3112,8 @@ standardConfig static_configs[] = {
|
||||
createStringConfig("replica-announce-ip", "slave-announce-ip", MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.replica_announce_ip, NULL, NULL, NULL),
|
||||
createStringConfig("primaryuser", "masteruser", MODIFIABLE_CONFIG | SENSITIVE_CONFIG, EMPTY_STRING_IS_NULL, server.primary_user, NULL, NULL, NULL),
|
||||
createStringConfig("cluster-announce-ip", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_ip, NULL, NULL, updateClusterIp),
|
||||
createStringConfig("cluster-announce-client-ipv4", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_client_ipv4, NULL, isValidIpV4, updateClusterClientIpV4),
|
||||
createStringConfig("cluster-announce-client-ipv6", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_client_ipv6, NULL, isValidIpV6, updateClusterClientIpV6),
|
||||
createStringConfig("cluster-config-file", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.cluster_configfile, "nodes.conf", isValidClusterConfigFile, NULL),
|
||||
createStringConfig("cluster-announce-hostname", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_hostname, NULL, isValidAnnouncedHostname, updateClusterHostname),
|
||||
createStringConfig("cluster-announce-human-nodename", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_human_nodename, NULL, isValidAnnouncedNodename, updateClusterHumanNodename),
|
||||
|
@ -62,8 +62,6 @@ typedef enum {
|
||||
#define CONN_TYPE_TLS "tls"
|
||||
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */
|
||||
|
||||
typedef enum connTypeForCaching { CACHE_CONN_TCP, CACHE_CONN_TLS, CACHE_CONN_TYPE_MAX } connTypeForCaching;
|
||||
|
||||
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
|
||||
|
||||
typedef struct ConnectionType {
|
||||
|
40
src/module.c
40
src/module.c
@ -8950,6 +8950,14 @@ size_t VM_GetClusterSize(void) {
|
||||
return getClusterSize();
|
||||
}
|
||||
|
||||
int moduleGetClusterNodeInfoForClient(ValkeyModuleCtx *ctx,
|
||||
client *c,
|
||||
const char *node_id,
|
||||
char *ip,
|
||||
char *primary_id,
|
||||
int *port,
|
||||
int *flags);
|
||||
|
||||
/* Populate the specified info for the node having as ID the specified 'id',
|
||||
* then returns VALKEYMODULE_OK. Otherwise if the format of node ID is invalid
|
||||
* or the node ID does not exist from the POV of this local node, VALKEYMODULE_ERR
|
||||
@ -8971,14 +8979,41 @@ size_t VM_GetClusterSize(void) {
|
||||
* * VALKEYMODULE_NODE_NOFAILOVER: The replica is configured to never failover
|
||||
*/
|
||||
int VM_GetClusterNodeInfo(ValkeyModuleCtx *ctx, const char *id, char *ip, char *primary_id, int *port, int *flags) {
|
||||
return moduleGetClusterNodeInfoForClient(ctx, NULL, id, ip, primary_id, port, flags);
|
||||
}
|
||||
|
||||
/* Like VM_GetClusterNodeInfo(), but returns IP address specifically for the given
|
||||
* client, depending on whether the client is connected over IPv4 or IPv6.
|
||||
*
|
||||
* See also VM_GetClientId(). */
|
||||
int VM_GetClusterNodeInfoForClient(ValkeyModuleCtx *ctx,
|
||||
uint64_t client_id,
|
||||
const char *node_id,
|
||||
char *ip,
|
||||
char *primary_id,
|
||||
int *port,
|
||||
int *flags) {
|
||||
client *c = lookupClientByID(client_id);
|
||||
if (c == NULL) return VALKEYMODULE_ERR;
|
||||
return moduleGetClusterNodeInfoForClient(ctx, c, node_id, ip, primary_id, port, flags);
|
||||
}
|
||||
|
||||
|
||||
int moduleGetClusterNodeInfoForClient(ValkeyModuleCtx *ctx,
|
||||
client *c,
|
||||
const char *node_id,
|
||||
char *ip,
|
||||
char *primary_id,
|
||||
int *port,
|
||||
int *flags) {
|
||||
UNUSED(ctx);
|
||||
|
||||
clusterNode *node = clusterLookupNode(id, strlen(id));
|
||||
clusterNode *node = clusterLookupNode(node_id, strlen(node_id));
|
||||
if (node == NULL || clusterNodePending(node)) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ip) valkey_strlcpy(ip, clusterNodeIp(node), NET_IP_STR_LEN);
|
||||
if (ip) valkey_strlcpy(ip, clusterNodeIp(node, c), NET_IP_STR_LEN);
|
||||
|
||||
if (primary_id) {
|
||||
/* If the information is not available, the function will set the
|
||||
@ -13708,6 +13743,7 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(RegisterClusterMessageReceiver);
|
||||
REGISTER_API(SendClusterMessage);
|
||||
REGISTER_API(GetClusterNodeInfo);
|
||||
REGISTER_API(GetClusterNodeInfoForClient);
|
||||
REGISTER_API(GetClusterNodesList);
|
||||
REGISTER_API(FreeClusterNodesList);
|
||||
REGISTER_API(CreateTimer);
|
||||
|
@ -3109,6 +3109,16 @@ char *getClientSockname(client *c) {
|
||||
return c->sockname;
|
||||
}
|
||||
|
||||
int isClientConnIpV6(client *c) {
|
||||
/* The cached client peer id is on the form "[IPv6]:port" for IPv6
|
||||
* addresses, so we just check for '[' here. */
|
||||
if (c->conn->type == NULL && server.current_client) {
|
||||
/* Fake client? Use current client instead. */
|
||||
c = server.current_client;
|
||||
}
|
||||
return getClientPeerId(c)[0] == '[';
|
||||
}
|
||||
|
||||
/* Concatenate a string representing the state of a client in a human
|
||||
* readable format, into the sds string 's'. */
|
||||
sds catClientInfoString(sds s, client *client) {
|
||||
|
14
src/server.h
14
src/server.h
@ -1505,6 +1505,15 @@ struct malloc_stats {
|
||||
size_t allocator_frag_smallbins_bytes;
|
||||
};
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Cached state per client connection type flags (bitwise or)
|
||||
*-----------------------------------------------------------------------------*/
|
||||
|
||||
#define CACHE_CONN_TYPE_TLS (1 << 0)
|
||||
#define CACHE_CONN_TYPE_IPv6 (1 << 1)
|
||||
#define CACHE_CONN_TYPE_RESP3 (1 << 2)
|
||||
#define CACHE_CONN_TYPE_MAX (1 << 3)
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* TLS Context Configuration
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -2052,6 +2061,8 @@ struct valkeyServer {
|
||||
int cluster_replica_no_failover; /* Prevent replica from starting a failover
|
||||
if the primary is in failure state. */
|
||||
char *cluster_announce_ip; /* IP address to announce on cluster bus. */
|
||||
char *cluster_announce_client_ipv4; /* IPv4 for clients, to announce on cluster bus. */
|
||||
char *cluster_announce_client_ipv6; /* IPv6 for clients, to announce on cluster bus. */
|
||||
char *cluster_announce_hostname; /* hostname to announce on cluster bus. */
|
||||
char *cluster_announce_human_nodename; /* Human readable node name assigned to a node. */
|
||||
int cluster_preferred_endpoint_type; /* Use the announced hostname when available. */
|
||||
@ -2070,7 +2081,7 @@ struct valkeyServer {
|
||||
* dropping packets of a specific type */
|
||||
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
|
||||
uint32_t debug_cluster_close_link_on_packet_drop : 1;
|
||||
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX][4]; /* Align to RESP3 */
|
||||
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
|
||||
/* Scripting */
|
||||
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
|
||||
int pre_command_oom_state; /* OOM before command (script?) was started */
|
||||
@ -2707,6 +2718,7 @@ void freeClientReplyValue(void *o);
|
||||
void *dupClientReplyValue(void *o);
|
||||
char *getClientPeerId(client *client);
|
||||
char *getClientSockName(client *client);
|
||||
int isClientConnIpV6(client *c);
|
||||
sds catClientInfoString(sds s, client *client);
|
||||
sds getAllClientsInfoString(int type);
|
||||
int clientSetName(client *c, robj *name, const char **err);
|
||||
|
@ -1472,6 +1472,13 @@ VALKEYMODULE_API int (*ValkeyModule_GetClusterNodeInfo)(ValkeyModuleCtx *ctx,
|
||||
char *primary_id,
|
||||
int *port,
|
||||
int *flags) VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API int (*ValkeyModule_GetClusterNodeInfoForClient)(ValkeyModuleCtx *ctx,
|
||||
uint64_t client_id,
|
||||
const char *node_id,
|
||||
char *ip,
|
||||
char *primary_id,
|
||||
int *port,
|
||||
int *flags) VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API char **(*ValkeyModule_GetClusterNodesList)(ValkeyModuleCtx *ctx, size_t *numnodes)VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API void (*ValkeyModule_FreeClusterNodesList)(char **ids) VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API ValkeyModuleTimerID (*ValkeyModule_CreateTimer)(ValkeyModuleCtx *ctx,
|
||||
@ -1938,6 +1945,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
|
||||
VALKEYMODULE_GET_API(RegisterClusterMessageReceiver);
|
||||
VALKEYMODULE_GET_API(SendClusterMessage);
|
||||
VALKEYMODULE_GET_API(GetClusterNodeInfo);
|
||||
VALKEYMODULE_GET_API(GetClusterNodeInfoForClient);
|
||||
VALKEYMODULE_GET_API(GetClusterNodesList);
|
||||
VALKEYMODULE_GET_API(FreeClusterNodesList);
|
||||
VALKEYMODULE_GET_API(CreateTimer);
|
||||
|
@ -345,6 +345,27 @@ proc are_hostnames_propagated {match_string} {
|
||||
return 1
|
||||
}
|
||||
|
||||
# Check if cluster's announced IPs are consistent and match a pattern
|
||||
# Optionally, a list of clients can be supplied.
|
||||
proc are_cluster_announced_ips_propagated {match_string {clients {}}} {
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
if {$clients eq {}} {
|
||||
set client [srv [expr -1*$j] "client"]
|
||||
} else {
|
||||
set client [lindex $clients $j]
|
||||
}
|
||||
set cfg [$client cluster slots]
|
||||
foreach node $cfg {
|
||||
for {set i 2} {$i < [llength $node]} {incr i} {
|
||||
if {! [string match $match_string [lindex [lindex $node $i] 0]] } {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
proc wait_node_marked_fail {ref_node_index instance_id_to_check} {
|
||||
wait_for_condition 1000 50 {
|
||||
[check_cluster_node_mark fail $ref_node_index $instance_id_to_check]
|
||||
|
@ -622,7 +622,7 @@ proc start_server {options {code undefined}} {
|
||||
# setup properties to be able to initialize a client object
|
||||
set port_param [expr $::tls ? {"tls-port"} : {"port"}]
|
||||
set host $::host
|
||||
if {[dict exists $config bind]} { set host [dict get $config bind] }
|
||||
if {[dict exists $config bind]} { set host [lindex [dict get $config bind] 0] }
|
||||
if {[dict exists $config $port_param]} { set port [dict get $config $port_param] }
|
||||
|
||||
# setup config dict
|
||||
|
149
tests/unit/cluster/announce-client-ip.tcl
Normal file
149
tests/unit/cluster/announce-client-ip.tcl
Normal file
@ -0,0 +1,149 @@
|
||||
# Small cluster. No need for failovers.
|
||||
start_cluster 2 2 {tags {external:skip cluster} overrides {cluster-replica-no-failover yes}} {
|
||||
|
||||
test "Set cluster announced IPv4 to invalid IP" {
|
||||
catch {R 0 config set cluster-announce-client-ipv4 banana} e
|
||||
assert_match "*Invalid IPv4 address*" $e
|
||||
}
|
||||
|
||||
test "Set cluster announced IPv4 and check that it propagates" {
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
set res [R $j config set cluster-announce-client-ipv4 "111.222.111.$j"]
|
||||
}
|
||||
|
||||
# CLUSTER SLOTS
|
||||
wait_for_condition 50 100 {
|
||||
[are_cluster_announced_ips_propagated {111.222.111.*}]
|
||||
} else {
|
||||
fail "cluster-announce-client-ipv4 were not propagated"
|
||||
}
|
||||
|
||||
# CLUSTER SHARDS
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
foreach shard [R $j CLUSTER SHARDS] {
|
||||
foreach node [dict get $shard "nodes"] {
|
||||
set ip [dict get $node "ip"]
|
||||
set endpoint [dict get $node "endpoint"]
|
||||
assert_match "111.222.111*" $ip
|
||||
assert_match "111.222.111*" $endpoint
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# CLUSTER NODES
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
set lines [split [R $j CLUSTER NODES] "\r\n"]
|
||||
foreach l $lines {
|
||||
set l [string trim $l]
|
||||
if {$l eq {}} continue
|
||||
assert_equal 1 [regexp {^[0-9a-f]+ 111\.222\.111\.[0-9]} $l]
|
||||
}
|
||||
}
|
||||
|
||||
# Redirects
|
||||
catch {R 0 set foo foo} e
|
||||
assert_match "MOVED * 111.222.111*:*" $e
|
||||
|
||||
# Now that everything is propagated, assert everyone agrees
|
||||
wait_for_cluster_propagation
|
||||
}
|
||||
|
||||
test "Clear announced client IPv4 and check that it propagates" {
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
R $j config set cluster-announce-client-ipv4 ""
|
||||
}
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[are_cluster_announced_ips_propagated "127.0.0.1"] eq 1
|
||||
} else {
|
||||
fail "Cleared cluster-announce-client-ipv4 were not propagated"
|
||||
}
|
||||
|
||||
# Redirect use the IP address
|
||||
catch {R 0 set foo foo} e
|
||||
assert_match "MOVED * 127.0.0.1:*" $e
|
||||
|
||||
# Now that everything is propagated, assert everyone agrees
|
||||
wait_for_cluster_propagation
|
||||
}
|
||||
}
|
||||
|
||||
start_cluster 2 2 {tags {external:skip cluster ipv6} overrides {cluster-replica-no-failover yes bind {127.0.0.1 ::1}}} {
|
||||
# Connecting to localhost as "::1" makes the clients use IPv6.
|
||||
set clients {}
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
set level [expr -1 * $j]
|
||||
lappend clients [valkey ::1 [srv $level port] 0 $::tls]
|
||||
}
|
||||
|
||||
test "Set cluster announced IPv6 to invalid IP" {
|
||||
catch {R 0 config set cluster-announce-client-ipv6 banana} e
|
||||
assert_match "*Invalid IPv6 address*" $e
|
||||
}
|
||||
|
||||
test "Set cluster announced IPv6 and check that it propagates" {
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
R $j config set cluster-announce-client-ipv6 "cafe:1234::$j"
|
||||
}
|
||||
|
||||
# CLUSTER SLOTS
|
||||
wait_for_condition 50 100 {
|
||||
[are_cluster_announced_ips_propagated "cafe:1234::*" $clients] eq 1
|
||||
} else {
|
||||
fail "cluster-announce-client-ipv6 were not propagated"
|
||||
}
|
||||
|
||||
# CLUSTER SHARDS
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
foreach shard [[lindex $clients $j] CLUSTER SHARDS] {
|
||||
foreach node [dict get $shard "nodes"] {
|
||||
set ip [dict get $node "ip"]
|
||||
set endpoint [dict get $node "endpoint"]
|
||||
assert_match "cafe:1234::*" $ip
|
||||
assert_match "cafe:1234::*" $endpoint
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# CLUSTER NODES
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
set lines [split [[lindex $clients $j] CLUSTER NODES] "\r\n"]
|
||||
foreach l $lines {
|
||||
set l [string trim $l]
|
||||
if {$l eq {}} continue
|
||||
assert_equal 1 [regexp {^[0-9a-f]+ cafe:1234::[0-9]} $l]
|
||||
}
|
||||
}
|
||||
|
||||
# Redirects
|
||||
catch {[lindex $clients 0] set foo foo} e
|
||||
assert_match "MOVED * cafe:1234::*:*" $e
|
||||
|
||||
# Now that everything is propagated, assert everyone agrees
|
||||
wait_for_cluster_propagation
|
||||
}
|
||||
|
||||
test "Clear announced client IPv6 and check that it propagates" {
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
R $j config set cluster-announce-client-ipv6 ""
|
||||
}
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[are_cluster_announced_ips_propagated "127.0.0.1" $clients] eq 1
|
||||
} else {
|
||||
fail "Cleared cluster-announce-client-ipv6 were not propagated"
|
||||
}
|
||||
|
||||
# Redirects
|
||||
catch {[lindex $clients 0] set foo foo} e
|
||||
assert_match "MOVED * 127.0.0.1:*" $e
|
||||
|
||||
# Now that everything is propagated, assert everyone agrees
|
||||
wait_for_cluster_propagation
|
||||
}
|
||||
|
||||
# Close clients
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
[lindex $clients $j] close
|
||||
}
|
||||
}
|
18
valkey.conf
18
valkey.conf
@ -1771,22 +1771,28 @@ aof-timestamp-enabled no
|
||||
#
|
||||
# In order to make a cluster work in such environments, a static
|
||||
# configuration where each node knows its public address is needed. The
|
||||
# following four options are used for this scope, and are:
|
||||
# following options are used for this scope, and are:
|
||||
#
|
||||
# * cluster-announce-ip
|
||||
# * cluster-announce-client-ipv4
|
||||
# * cluster-announce-client-ipv6
|
||||
# * cluster-announce-port
|
||||
# * cluster-announce-tls-port
|
||||
# * cluster-announce-bus-port
|
||||
#
|
||||
# Each instructs the node about its address, client ports (for connections
|
||||
# without and with TLS) and cluster message bus port. The information is then
|
||||
# published in the header of the bus packets so that other nodes will be able to
|
||||
# correctly map the address of the node publishing the information.
|
||||
# Each instructs the node about its address, possibly other addresses to expose
|
||||
# to clients, client ports (for connections without and with TLS) and cluster
|
||||
# message bus port. The information is then published in the bus packets so that
|
||||
# other nodes will be able to correctly map the address of the node publishing
|
||||
# the information.
|
||||
#
|
||||
# If tls-cluster is set to yes and cluster-announce-tls-port is omitted or set
|
||||
# to zero, then cluster-announce-port refers to the TLS port. Note also that
|
||||
# cluster-announce-tls-port has no effect if tls-cluster is set to no.
|
||||
#
|
||||
# If cluster-announce-client-ipv4 and cluster-announce-client-ipv6 are omitted,
|
||||
# then cluster-announce-ip is exposed to clients.
|
||||
#
|
||||
# If the above options are not used, the normal cluster auto-detection
|
||||
# will be used instead.
|
||||
#
|
||||
@ -1798,6 +1804,8 @@ aof-timestamp-enabled no
|
||||
# Example:
|
||||
#
|
||||
# cluster-announce-ip 10.1.1.5
|
||||
# cluster-announce-client-ipv4 123.123.123.5
|
||||
# cluster-announce-client-ipv6 2001:db8::8a2e:370:7334
|
||||
# cluster-announce-tls-port 6379
|
||||
# cluster-announce-port 0
|
||||
# cluster-announce-bus-port 6380
|
||||
|
Loading…
Reference in New Issue
Block a user