From 5460c100471fda537468d1f1fa43258638c8a1ce Mon Sep 17 00:00:00 2001 From: Madelyn Olson <34459052+madolson@users.noreply.github.com> Date: Sun, 2 Jan 2022 19:48:29 -0800 Subject: [PATCH] Implement clusterbus message extensions and cluster hostname support (#9530) Implement the ability for cluster nodes to advertise their location with extension messages. --- redis.conf | 26 ++ src/cluster.c | 312 +++++++++++++++++--- src/cluster.h | 33 ++- src/config.c | 39 +++ src/debug.c | 8 + src/server.c | 1 + src/server.h | 14 + tests/cluster/cluster.tcl | 3 +- tests/cluster/tests/27-endpoints.tcl | 221 ++++++++++++++ tests/cluster/tests/includes/init-tests.tcl | 2 + 10 files changed, 609 insertions(+), 50 deletions(-) create mode 100644 tests/cluster/tests/27-endpoints.tcl diff --git a/redis.conf b/redis.conf index 8804aac37..9d0f0dfb7 100644 --- a/redis.conf +++ b/redis.conf @@ -1632,6 +1632,32 @@ lua-time-limit 5000 # PubSub message by default. (client-query-buffer-limit default value is 1gb) # # cluster-link-sendbuf-limit 0 + +# Clusters can configure their announced hostname using this config. This is a common use case for +# applications that need to use TLS Server Name Indication (SNI) or dealing with DNS based +# routing. By default this value is only shown as additional metadata in the CLUSTER SLOTS +# command, but can be changed using 'cluster-preferred-endpoint-type' config. This value is +# communicated along the clusterbus to all nodes, setting it to an empty string will remove +# the hostname and also propgate the removal. +# +# cluster-announce-hostname "" + +# Clusters can advertise how clients should connect to them using either their IP address, +# a user defined hostname, or by declaring they have no endpoint. Which endpoint is +# shown as the preferred endpoint is set by using the cluster-preferred-endpoint-type +# config with values 'ip', 'hostname', or 'unknown-endpoint'. This value controls how +# the endpoint returned for MOVED/ASKING requests as well as the first field of CLUSTER SLOTS. +# If the preferred endpoint type is set to hostname, but no announced hostname is set, a '?' +# will be returned instead. +# +# When a cluster advertises itself as having an unknown endpoint, it's indicating that +# the server doesn't know how clients can reach the cluster. This can happen in certain +# networking situations where there are multiple possible routes to the node, and the +# server doesn't know which one the client took. In this case, the server is expecting +# the client to reach out on the same endpoint it used for making the last request, but use +# the port provided in the response. +# +# cluster-preferred-endpoint-type ip # In order to setup your cluster make sure to read the documentation # available at https://redis.io web site. diff --git a/src/cluster.c b/src/cluster.c index 81322a8aa..87a965d96 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -215,6 +215,9 @@ int clusterLoadConfig(char *filename) { n = createClusterNode(argv[0],0); clusterAddNode(n); } + /* Format for the node address information: + * ip:port[@cport][,hostname] */ + /* Address and port */ if ((p = strrchr(argv[1],':')) == NULL) { sdsfreesplitres(argv,argc); @@ -234,6 +237,18 @@ int clusterLoadConfig(char *filename) { * base port. */ n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR; + /* Hostname is an optional argument that defines the endpoint + * that can be reported to clients instead of IP. */ + char *hostname = strchr(p, ','); + if (hostname) { + *hostname = '\0'; + hostname++; + zfree(n->hostname); + n->hostname = zstrdup(hostname); + } else { + n->hostname = NULL; + } + /* The plaintext port for client in a TLS cluster (n->pport) is not * stored in nodes.conf. It is received later over the bus protocol. */ @@ -553,6 +568,31 @@ void clusterUpdateMyselfIp(void) { } } +/* Update the hostname for the specified node with the provided C string. */ +static void updateAnnouncedHostname(clusterNode *node, char *new) { + if (!node->hostname && !new) { + return; + } + + /* Previous and new hostname are the same, no need to update. */ + if (new && node->hostname && !strcmp(new, node->hostname)) { + return; + } + + if (node->hostname) zfree(node->hostname); + if (new) { + node->hostname = zstrdup(new); + } else { + node->hostname = NULL; + } +} + +/* Update my hostname based on server configuration values */ +void clusterUpdateMyselfHostname(void) { + if (!myself) return; + updateAnnouncedHostname(myself, server.cluster_announce_hostname); +} + void clusterInit(void) { int saveconf = 0; @@ -646,6 +686,7 @@ void clusterInit(void) { resetManualFailover(); clusterUpdateMyselfFlags(); clusterUpdateMyselfIp(); + clusterUpdateMyselfHostname(); } /* Reset a node performing a soft or hard reset: @@ -918,6 +959,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->link = NULL; node->inbound_link = NULL; memset(node->ip,0,sizeof(node->ip)); + node->hostname = NULL; node->port = 0; node->cport = 0; node->pport = 0; @@ -1083,6 +1125,7 @@ void freeClusterNode(clusterNode *n) { nodename = sdsnewlen(n->name, CLUSTER_NAMELEN); serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK); sdsfree(nodename); + zfree(n->hostname); /* Release links and associated data structures. */ if (n->link) freeClusterLink(n->link); @@ -1871,6 +1914,93 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc } } +/* Cluster ping extensions. + * + * The ping/pong/meet messages support arbitrary extensions to add additional + * metadata to the messages that are sent between the various nodes in the + * cluster. The extensions take the form: + * [ Header length + type (8 bytes) ] + * [ Extension information (Arbitrary length, but must be 8 byte padded) ] + */ + + +/* Returns the length of a given extension */ +static uint32_t getPingExtLength(clusterMsgPingExt *ext) { + return ntohl(ext->length); +} + +/* Returns the initial position of ping extensions. May return an invalid + * address if there are no ping extensions. */ +static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, uint16_t count) { + clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]); + return initial; +} + +/* Given a current ping extension, returns the start of the next extension. May return + * an invalid address if there are no further ping extensions. */ +static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) { + clusterMsgPingExt *next = (clusterMsgPingExt *) (((char *) ext) + getPingExtLength(ext)); + return next; +} + +/* Returns the exact size needed to store the hostname. The returned value + * will be 8 byte padded. */ +int getHostnamePingExtSize() { + /* If hostname is not set, we don't send this extension */ + if (!myself->hostname) return 0; + + int totlen = sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(strlen(myself->hostname) + 1); + return totlen; +} + +/* Write the hostname ping extension at the start of the cursor. This function + * will update the cursor to point to the end of the written extension and + * will return the amount of bytes written. */ +int writeHostnamePingExt(clusterMsgPingExt **cursor) { + /* If hostname is not set, we don't send this extension */ + if (!myself->hostname) return 0; + + /* Add the hostname information at the extension cursor */ + clusterMsgPingExtHostname *ext = &(*cursor)->ext[0].hostname; + size_t hostname_len = strlen(myself->hostname); + memcpy(ext->hostname, myself->hostname, hostname_len); + uint32_t extension_size = getHostnamePingExtSize(); + + /* Move the write cursor */ + (*cursor)->type = CLUSTERMSG_EXT_TYPE_HOSTNAME; + (*cursor)->length = htonl(extension_size); + /* Make sure the string is NULL terminated by adding 1 */ + *cursor = (clusterMsgPingExt *) (ext->hostname + EIGHT_BYTE_ALIGN(strlen(myself->hostname) + 1)); + return extension_size; +} + +/* We previously validated the extensions, so this function just needs to + * handle the extensions. */ +void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { + clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); + char *ext_hostname = NULL; + uint16_t extensions = ntohs(hdr->extensions); + /* Loop through all the extensions and process them */ + clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count)); + while (extensions--) { + uint16_t type = ntohs(ext->type); + if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) { + clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname); + ext_hostname = hostname_ext->hostname; + } else { + /* Unknown type, we will ignore it but log what happened. */ + serverLog(LL_WARNING, "Received unknown extension type %d", type); + } + + /* We know this will be valid since we validated it ahead of time */ + ext = getNextPingExt(ext); + } + /* If the node did not send us a hostname extension, assume + * they don't have an announced hostname. Otherwise, we'll + * set it now. */ + updateAnnouncedHostname(sender, ext_hostname); +} + static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { clusterNode *sender; if (link->node && !nodeInHandshake(link->node)) { @@ -1920,52 +2050,78 @@ int clusterProcessPacket(clusterLink *link) { return 1; } + if (type == server.cluster_drop_packet_filter) { + serverLog(LL_WARNING, "Dropping packet that matches debug drop filter"); + return 1; + } + uint16_t flags = ntohs(hdr->flags); + uint16_t extensions = ntohs(hdr->extensions); uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; + uint32_t explen; /* expected length of this packet */ clusterNode *sender; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { uint16_t count = ntohs(hdr->count); - uint32_t explen; /* expected length of this packet */ explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += (sizeof(clusterMsgDataGossip)*count); - if (totlen != explen) return 1; + + /* If there is extension data, which doesn't have a fixed length, + * loop through them and validate the length of it now. */ + if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { + clusterMsgPingExt *ext = getInitialPingExt(hdr, count); + while (extensions--) { + uint16_t extlen = getPingExtLength(ext); + if (extlen % 8 != 0) { + serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)", + clusterGetMessageTypeString(type), (int) extlen); + return 1; + } + if ((totlen - explen) < extlen) { + serverLog(LL_WARNING, "Received invalid %s packet with extension data that exceeds " + "total packet length (%lld)", clusterGetMessageTypeString(type), + (unsigned long long) totlen); + return 1; + } + explen += extlen; + ext = getNextPingExt(ext); + } + } } else if (type == CLUSTERMSG_TYPE_FAIL) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - + explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataFail); - if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - + explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) + ntohl(hdr->data.publish.msg.message_len); - if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - - if (totlen != explen) return 1; + explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); } else if (type == CLUSTERMSG_TYPE_UPDATE) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - + explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataUpdate); - if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_MODULE) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - + explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len); - if (totlen != explen) return 1; + } else { + /* We don't know this type of packet, so we assume it's well formed. */ + explen = totlen; } + if (totlen != explen) { + serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld", + clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen); + return 1; + } + sender = getNodeFromLinkAndMsg(link, hdr); /* Update the last time we saw any data from this node. We @@ -2272,7 +2428,10 @@ int clusterProcessPacket(clusterLink *link) { } /* Get info from the gossip section */ - if (sender) clusterProcessGossipSection(hdr,link); + if (sender) { + clusterProcessGossipSection(hdr,link); + clusterProcessPingExtensions(hdr,link); + } } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; @@ -2695,7 +2854,7 @@ void clusterSendPing(clusterLink *link, int type) { clusterMsg *hdr; int gossipcount = 0; /* Number of gossip sections added so far. */ int wanted; /* Number of gossip sections we want to append if possible. */ - int totlen; /* Total packet length. */ + int estlen; /* Upper bound on estimated packet length */ /* freshnodes is the max number of nodes we can hope to append at all: * nodes available minus two (ourself and the node we are sending the * message to). However practically there may be less valid nodes since @@ -2736,15 +2895,17 @@ void clusterSendPing(clusterLink *link, int type) { * faster to propagate to go from PFAIL to FAIL state. */ int pfail_wanted = server.cluster->stats_pfail_nodes; - /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen + /* Compute the maximum estlen to allocate our buffer. We'll fix the estlen * later according to the number of gossip sections we really were able * to put inside the packet. */ - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted)); + estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData); + estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted)); + estlen += sizeof(clusterMsgPingExt) + getHostnamePingExtSize(); + /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ - if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg); - buf = zcalloc(totlen); + if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg); + buf = zcalloc(estlen); hdr = (clusterMsg*) buf; /* Populate the header. */ @@ -2808,11 +2969,23 @@ void clusterSendPing(clusterLink *link, int type) { dictReleaseIterator(di); } - /* Ready to send... fix the totlen field and queue the message in the - * output buffer. */ - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + + int totlen = 0; + int extensions = 0; + /* Set the initial extension position */ + clusterMsgPingExt *cursor = getInitialPingExt(hdr, gossipcount); + /* Add in the extensions */ + if (myself->hostname) { + hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA; + totlen += writeHostnamePingExt(&cursor); + extensions++; + } + + /* Compute the actual total length and send! */ + totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); hdr->count = htons(gossipcount); + hdr->extensions = htons(extensions); hdr->totlen = htonl(totlen); clusterSendMessage(link,buf,totlen); zfree(buf); @@ -3786,6 +3959,7 @@ void clusterCron(void) { iteration++; /* Number of times this function was called so far. */ + updateAnnouncedHostname(myself, server.cluster_announce_hostname); /* The handshake timeout is the time after which a handshake node that was * not turned into a normal node is removed from the nodes. Usually it is * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use @@ -4404,10 +4578,18 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) { /* Node coordinates */ ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN); - ci = sdscatfmt(ci," %s:%i@%i ", - node->ip, - port, - node->cport); + if (node->hostname) { + ci = sdscatfmt(ci," %s:%i@%i,%s ", + node->ip, + port, + node->cport, + node->hostname); + } else { + ci = sdscatfmt(ci," %s:%i@%i ", + node->ip, + port, + node->cport); + } /* Flags */ ci = representClusterNodeFlags(ci, node->flags); @@ -4619,6 +4801,15 @@ void addReplyClusterLinksDescription(client *c) { * CLUSTER command * -------------------------------------------------------------------------- */ +const char *getPreferredEndpoint(clusterNode *n) { + switch(server.cluster_preferred_endpoint_type) { + case CLUSTER_ENDPOINT_TYPE_IP: return n->ip; + case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return n->hostname ? n->hostname : "?"; + case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return ""; + } + return "unknown"; +} + const char *clusterGetMessageTypeString(int type) { switch(type) { case CLUSTERMSG_TYPE_PING: return "ping"; @@ -4702,31 +4893,56 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) { } } -void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) { - int i, nested_elements = 3; /* slots (2) + master addr (1) */ - void *nested_replylen = addReplyDeferredLen(c); - addReplyLongLong(c, start_slot); - addReplyLongLong(c, end_slot); - addReplyArrayLen(c, 3); - addReplyBulkCString(c, node->ip); +void addNodeToNodeReply(client *c, clusterNode *node) { + addReplyArrayLen(c, 4); + if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) { + addReplyBulkCString(c, node->ip); + } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) { + addReplyBulkCString(c, node->hostname ? node->hostname : "?"); + } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) { + addReplyNull(c); + } else { + serverPanic("Unrecognized preferred endpoint type"); + } + /* Report non-TLS ports to non-TLS client in TLS cluster if available. */ int use_pport = (server.tls_cluster && c->conn && connGetType(c->conn) != CONN_TYPE_TLS); addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port); addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + /* Add the additional endpoint information, this is all the known networking information + * that is not the preferred endpoint. */ + void *deflen = addReplyDeferredLen(c); + int length = 0; + if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { + addReplyBulkCString(c, "ip"); + addReplyBulkCString(c, node->ip); + length++; + } + if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME + && node->hostname) + { + addReplyBulkCString(c, "hostname"); + addReplyBulkCString(c, node->hostname); + length++; + } + setDeferredMapLen(c, deflen, length); +} + +void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) { + int i, nested_elements = 3; /* slots (2) + master addr (1) */ + void *nested_replylen = addReplyDeferredLen(c); + addReplyLongLong(c, start_slot); + addReplyLongLong(c, end_slot); + addNodeToNodeReply(c, node); + /* Remaining nodes in reply are replicas for slot range */ for (i = 0; i < node->numslaves; i++) { /* This loop is copy/pasted from clusterGenNodeDescription() * with modifications for per-slot node aggregation. */ if (!isReplicaAvailable(node->slaves[i])) continue; - addReplyArrayLen(c, 3); - addReplyBulkCString(c, node->slaves[i]->ip); - /* Report slave's non-TLS port to non-TLS client in TLS cluster */ - addReplyLongLong(c, (use_pport && node->slaves[i]->pport ? - node->slaves[i]->pport : - node->slaves[i]->port)); - addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN); + addNodeToNodeReply(c, node->slaves[i]); nested_elements++; } setDeferredArrayLen(c, nested_replylen, nested_elements); @@ -4864,7 +5080,7 @@ NULL /* Report plaintext ports, only if cluster is TLS but client is known to * be non-TLS). */ int use_pport = (server.tls_cluster && - c->conn && connGetType(c->conn) != CONN_TYPE_TLS); + c->conn && connGetType(c->conn) != CONN_TYPE_TLS); sds nodes = clusterGenNodesDescription(0, use_pport); addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); sdsfree(nodes); @@ -6391,12 +6607,12 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co /* Redirect to IP:port. Include plaintext port if cluster is TLS but * client is non-TLS. */ int use_pport = (server.tls_cluster && - c->conn && connGetType(c->conn) != CONN_TYPE_TLS); + c->conn && connGetType(c->conn) != CONN_TYPE_TLS); int port = use_pport && n->pport ? n->pport : n->port; addReplyErrorSds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", - hashslot, n->ip, port)); + hashslot, getPreferredEndpoint(n), port)); } else { serverPanic("getNodeByQuery() unknown error."); } diff --git a/src/cluster.h b/src/cluster.h index a28176e4b..8b76ed19e 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -135,6 +135,7 @@ typedef struct clusterNode { mstime_t orphaned_time; /* Starting time of orphaned master condition */ long long repl_offset; /* Last known repl offset for this node. */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ + char *hostname; /* The known hostname for this node */ int port; /* Latest known clients port (TLS or plain). */ int pport; /* Latest known clients plaintext port. Only used if the main clients port is for TLS. */ @@ -245,11 +246,38 @@ typedef struct { unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */ } clusterMsgModule; +/* The cluster supports optional extension messages that can be sent + * along with ping/pong/meet messages to give additional info in a + * consistent manner. */ +typedef enum { + CLUSTERMSG_EXT_TYPE_HOSTNAME, +} clusterMsgPingtypes; + +/* Helper function for making sure extensions are eight byte aligned. */ +#define EIGHT_BYTE_ALIGN(size) ((((size) + 7) / 8) * 8) + +typedef struct { + char hostname[1]; /* The announced hostname, ends with \0. */ +} clusterMsgPingExtHostname; + +typedef struct { + uint32_t length; /* Total length of this extension message (including this header) */ + uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */ + uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */ + union { + clusterMsgPingExtHostname hostname; + } ext[]; /* Actual extension information, formatted so that the data is 8 + * byte aligned, regardless of its content. */ +} clusterMsgPingExt; + union clusterMsgData { /* PING, MEET and PONG */ struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; + /* Extension data that can optionally be sent for ping/meet/pong + * messages. We can't explicitly define them here though, since + * the gossip array isn't the real length of the gossip data. */ } ping; /* FAIL */ @@ -292,7 +320,8 @@ typedef struct { unsigned char myslots[CLUSTER_SLOTS/8]; char slaveof[CLUSTER_NAMELEN]; char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */ - char notused1[32]; /* 32 bytes reserved for future usage. */ + uint16_t extensions; /* Number of extensions sent along with this packet. */ + char notused1[16]; /* 16 bytes reserved for future usage. */ uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */ uint16_t cport; /* Sender TCP cluster bus port */ uint16_t flags; /* Sender node flags */ @@ -308,6 +337,7 @@ typedef struct { #define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */ #define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if master is up. */ +#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */ /* ---------------------- API exported outside cluster.c -------------------- */ void clusterInit(void); @@ -334,5 +364,6 @@ void clusterUpdateMyselfFlags(void); void clusterUpdateMyselfIp(void); void slotToChannelAdd(sds channel); void slotToChannelDel(sds channel); +void clusterUpdateMyselfHostname(void); #endif /* __CLUSTER_H */ diff --git a/src/config.c b/src/config.c index 317b92ea2..0fc93d1de 100644 --- a/src/config.c +++ b/src/config.c @@ -141,6 +141,13 @@ configEnum protected_action_enum[] = { {NULL, 0} }; +configEnum cluster_preferred_endpoint_type_enum[] = { + {"ip", CLUSTER_ENDPOINT_TYPE_IP}, + {"hostname", CLUSTER_ENDPOINT_TYPE_HOSTNAME}, + {"unknown-endpoint", CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT}, + {NULL, 0} +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -2150,6 +2157,30 @@ static int isValidAOFfilename(char *val, const char **err) { return 1; } +static int isValidAnnouncedHostname(char *val, const char **err) { + if (strlen(val) >= NET_HOST_STR_LEN) { + *err = "Hostnames must be less than " + STRINGIFY(NET_HOST_STR_LEN) " characters"; + return 0; + } + + int i = 0; + char c; + while ((c = val[i])) { + /* We just validate the character set to make sure that everything + * is parsed and handled correctly. */ + if (!((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') + || (c >= '0' && c <= '9') || (c == '-') || (c == '.'))) + { + *err = "Hostnames may only contain alphanumeric characters, " + "hyphens or dots"; + return 0; + } + c = val[i++]; + } + return 1; +} + /* Validate specified string is a valid proc-title-template */ static int isValidProcTitleTemplate(char *val, const char **err) { if (!validateProcTitleTemplate(val)) { @@ -2305,6 +2336,12 @@ static int updateClusterIp(const char **err) { return 1; } +int updateClusterHostname(const char **err) { + UNUSED(err); + clusterUpdateMyselfHostname(); + return 1; +} + #ifdef USE_OPENSSL static int applyTlsCfg(const char **err) { UNUSED(err); @@ -2652,6 +2689,7 @@ standardConfig configs[] = { createStringConfig("masteruser", NULL, MODIFIABLE_CONFIG | SENSITIVE_CONFIG, EMPTY_STRING_IS_NULL, server.masteruser, NULL, NULL, NULL), createStringConfig("cluster-announce-ip", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_ip, NULL, NULL, updateClusterIp), createStringConfig("cluster-config-file", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.cluster_configfile, "nodes.conf", NULL, NULL), + createStringConfig("cluster-announce-hostname", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_hostname, NULL, isValidAnnouncedHostname, updateClusterHostname), createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL), createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG | PROTECTED_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL), createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL), @@ -2681,6 +2719,7 @@ standardConfig configs[] = { createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), + createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL), /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), diff --git a/src/debug.c b/src/debug.c index f01509e84..0b38b1856 100644 --- a/src/debug.c +++ b/src/debug.c @@ -425,6 +425,8 @@ void debugCommand(client *c) { #endif "OBJECT ", " Show low level info about `key` and associated value.", +"DROP-CLUSTER-PACKET-FILTER ", +" Drop all packets that match the filtered type. Set to -1 allow all packets.", "OOM", " Crash the server simulating an out-of-memory error.", "PANIC", @@ -575,6 +577,12 @@ NULL server.dirty = 0; /* Prevent AOF / replication */ serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"drop-cluster-packet-filter") && c->argc == 3) { + long packet_type; + if (getLongFromObjectOrReply(c, c->argv[2], &packet_type, NULL) != C_OK) + return; + server.cluster_drop_packet_filter = packet_type; + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { dictEntry *de; robj *val; diff --git a/src/server.c b/src/server.c index 4770a5df0..78f3c8bfb 100644 --- a/src/server.c +++ b/src/server.c @@ -2293,6 +2293,7 @@ void initServer(void) { server.blocked_last_cron = 0; server.blocking_op_nesting = 0; server.thp_enabled = 0; + server.cluster_drop_packet_filter = -1; resetReplicationBuffer(); if ((server.tls_port || server.tls_replication || server.tls_cluster) diff --git a/src/server.h b/src/server.h index c1a0af355..fb22b3cf7 100644 --- a/src/server.h +++ b/src/server.h @@ -527,6 +527,13 @@ typedef struct { mstime_t end; } pause_event; +/* Ways that a clusters endpoint can be described */ +typedef enum { + CLUSTER_ENDPOINT_TYPE_IP = 0, /* Show IP address */ + CLUSTER_ENDPOINT_TYPE_HOSTNAME, /* Show hostname */ + CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT /* Show NULL or empty */ +} cluster_endpoint_type; + /* RDB active child save type. */ #define RDB_CHILD_TYPE_NONE 0 #define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */ @@ -1771,6 +1778,8 @@ struct redisServer { int cluster_slave_no_failover; /* Prevent slave from starting a failover if the master is in failure state. */ char *cluster_announce_ip; /* IP address to announce on cluster bus. */ + char *cluster_announce_hostname; /* IP address to announce on cluster bus. */ + int cluster_preferred_endpoint_type; /* Use the announced hostname when available. */ int cluster_announce_port; /* base port to announce on cluster bus. */ int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */ int cluster_announce_bus_port; /* bus port to announce on cluster bus. */ @@ -1782,6 +1791,8 @@ struct redisServer { is down? */ int cluster_config_file_lock_fd; /* cluster config fd, will be flock */ unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/ + int cluster_drop_packet_filter; /* Debug config that allows tactically + * dropping packets of a specific type */ /* Scripting */ client *script_caller; /* The client running script right now, or NULL */ mstime_t script_time_limit; /* Script timeout in milliseconds */ @@ -3334,4 +3345,7 @@ int isTlsConfigured(void); int iAmMaster(void); +#define STRINGIFY_(x) #x +#define STRINGIFY(x) STRINGIFY_(x) + #endif diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl index 31e0c3667..531e90d6c 100644 --- a/tests/cluster/cluster.tcl +++ b/tests/cluster/cluster.tcl @@ -142,7 +142,8 @@ proc cluster_allocate_with_continuous_slots {n} { } } -# Create a cluster composed of the specified number of masters and slaves with continuous slots. +# Create a cluster composed of the specified number of masters and slaves, +# but with a continuous slot range. proc cluster_create_with_continuous_slots {masters slaves} { cluster_allocate_with_continuous_slots $masters if {$slaves} { diff --git a/tests/cluster/tests/27-endpoints.tcl b/tests/cluster/tests/27-endpoints.tcl new file mode 100644 index 000000000..0cadee2eb --- /dev/null +++ b/tests/cluster/tests/27-endpoints.tcl @@ -0,0 +1,221 @@ +source "../tests/includes/init-tests.tcl" + +# Check if cluster's view of hostnames is consistent +proc are_hostnames_propagated {match_string} { + for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} { + set cfg [R $j cluster slots] + foreach node $cfg { + for {set i 2} {$i < [llength $node]} {incr i} { + if {! [string match $match_string [lindex [lindex [lindex $node $i] 3] 1]] } { + return 0 + } + } + } + } + return 1 +} + +# Isolate a node from the cluster and give it a new nodeid +proc isolate_node {id} { + set node_id [R $id CLUSTER MYID] + R 6 CLUSTER RESET HARD + for {set j 0} {$j < 20} {incr j} { + if { $j eq $id } { + continue + } + R $j CLUSTER FORGET $node_id + } +} + +proc get_slot_field {slot_output shard_id node_id attrib_id} { + return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id] +} + +test "Create a 6 nodes cluster" { + cluster_create_with_continuous_slots 3 3 +} + +test "Cluster should start ok" { + assert_cluster_state ok + wait_for_cluster_propagation +} + +test "Set cluster hostnames and verify they are propagated" { + for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} { + R $j config set cluster-announce-hostname "host-$j.com" + } + + wait_for_condition 50 100 { + [are_hostnames_propagated "host-*.com"] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + + # Now that everything is propagated, assert everyone agrees + wait_for_cluster_propagation +} + +test "Update hostnames and make sure they are all eventually propagated" { + for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} { + R $j config set cluster-announce-hostname "host-updated-$j.com" + } + + wait_for_condition 50 100 { + [are_hostnames_propagated "host-updated-*.com"] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + + # Now that everything is propagated, assert everyone agrees + wait_for_cluster_propagation +} + +test "Remove hostnames and make sure they are all eventually propagated" { + for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} { + R $j config set cluster-announce-hostname "" + } + + wait_for_condition 50 100 { + [are_hostnames_propagated ""] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + + # Now that everything is propagated, assert everyone agrees + wait_for_cluster_propagation +} + +test "Verify cluster-preferred-endpoint-type behavior for redirects and info" { + R 0 config set cluster-announce-hostname "me.com" + R 1 config set cluster-announce-hostname "" + R 2 config set cluster-announce-hostname "them.com" + + wait_for_cluster_propagation + + # Verify default behavior + set slot_result [R 0 cluster slots] + assert_equal "" [lindex [get_slot_field $slot_result 0 2 0] 1] + assert_equal "" [lindex [get_slot_field $slot_result 2 2 0] 1] + assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 0] + assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 1] + assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 0] + assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 1] + + # Redirect will use the IP address + catch {R 0 set foo foo} redir_err + assert_match "MOVED * 127.0.0.1:*" $redir_err + + # Verify prefer hostname behavior + R 0 config set cluster-preferred-endpoint-type hostname + + set slot_result [R 0 cluster slots] + assert_equal "me.com" [get_slot_field $slot_result 0 2 0] + assert_equal "them.com" [get_slot_field $slot_result 2 2 0] + + # Redirect should use hostname + catch {R 0 set foo foo} redir_err + assert_match "MOVED * them.com:*" $redir_err + + # Redirect to an unknown hostname returns ? + catch {R 0 set barfoo bar} redir_err + assert_match "MOVED * ?:*" $redir_err + + # Verify unknown hostname behavior + R 0 config set cluster-preferred-endpoint-type unknown-endpoint + + # Verify default behavior + set slot_result [R 0 cluster slots] + assert_equal "ip" [lindex [get_slot_field $slot_result 0 2 3] 0] + assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 0 2 3] 1] + assert_equal "ip" [lindex [get_slot_field $slot_result 2 2 3] 0] + assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 2 2 3] 1] + assert_equal "ip" [lindex [get_slot_field $slot_result 1 2 3] 0] + assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 1 2 3] 1] + # Not required by the protocol, but IP comes before hostname + assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 2] + assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 3] + assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 2] + assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 3] + + # This node doesn't have a hostname + assert_equal 2 [llength [get_slot_field $slot_result 1 2 3]] + + # Redirect should use empty string + catch {R 0 set foo foo} redir_err + assert_match "MOVED * :*" $redir_err + + R 0 config set cluster-preferred-endpoint-type ip +} + +test "Verify the nodes configured with prefer hostname only show hostname for new nodes" { + # Have everyone forget node 6 and isolate it from the cluster. + isolate_node 6 + + # Set hostnames for the primaries, now that the node is isolated + R 0 config set cluster-announce-hostname "shard-1.com" + R 1 config set cluster-announce-hostname "shard-2.com" + R 2 config set cluster-announce-hostname "shard-3.com" + + # Prevent Node 0 and Node 6 from properly meeting, + # they'll hang in the handshake phase. This allows us to + # test the case where we "know" about it but haven't + # successfully retrieved information about it yet. + R 0 DEBUG DROP-CLUSTER-PACKET-FILTER 0 + R 6 DEBUG DROP-CLUSTER-PACKET-FILTER 0 + + # Have a replica meet the isolated node + R 3 cluster meet 127.0.0.1 [get_instance_attrib redis 6 port] + + # Now, we wait until the two nodes that aren't filtering packets + # to accept our isolated nodes connections. At this point they will + # start showing up in cluster slots. + wait_for_condition 50 100 { + [llength [R 6 CLUSTER SLOTS]] eq 2 + } else { + fail "Node did not learn about the 2 shards it can talk to" + } + set slot_result [R 6 CLUSTER SLOTS] + assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "shard-2.com" + assert_equal [lindex [get_slot_field $slot_result 1 2 3] 1] "shard-3.com" + + # Also make sure we know about the isolated primary, we + # just can't reach it. + set primary_id [R 0 CLUSTER MYID] + assert_match "*$primary_id*" [R 6 CLUSTER NODES] + + # Stop dropping cluster packets, and make sure everything + # stabilizes + R 0 DEBUG DROP-CLUSTER-PACKET-FILTER -1 + R 6 DEBUG DROP-CLUSTER-PACKET-FILTER -1 + + wait_for_condition 50 100 { + [llength [R 6 CLUSTER SLOTS]] eq 3 + } else { + fail "Node did not learn about the 2 shards it can talk to" + } + set slot_result [R 6 CLUSTER SLOTS] + assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "shard-1.com" + assert_equal [lindex [get_slot_field $slot_result 1 2 3] 1] "shard-2.com" + assert_equal [lindex [get_slot_field $slot_result 2 2 3] 1] "shard-3.com" +} + +test "Test restart will keep hostname information" { + # Set a new hostname, reboot and make sure it sticks + R 0 config set cluster-announce-hostname "restart-1.com" + restart_instance redis 0 + set slot_result [R 0 CLUSTER SLOTS] + assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "restart-1.com" + + # As a sanity check, make sure everyone eventually agrees + wait_for_cluster_propagation +} + +test "Test hostname validation" { + catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err + assert_match "*Hostnames must be less than 256 characters*" $err + catch {R 0 config set cluster-announce-hostname "?.com"} err + assert_match "*Hostnames may only contain alphanumeric characters, hyphens or dots*" $err + + # Note this isn't a valid hostname, but it passes our internal validation + R 0 config set cluster-announce-hostname "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-." +} \ No newline at end of file diff --git a/tests/cluster/tests/includes/init-tests.tcl b/tests/cluster/tests/includes/init-tests.tcl index b787962a8..fc5897a1a 100644 --- a/tests/cluster/tests/includes/init-tests.tcl +++ b/tests/cluster/tests/includes/init-tests.tcl @@ -42,6 +42,8 @@ test "Cluster nodes hard reset" { R $id config set loading-process-events-interval-bytes 2097152 R $id config set key-load-delay 0 R $id config set repl-diskless-load disabled + R $id config set cluster-announce-hostname "" + R $id DEBUG DROP-CLUSTER-PACKET-FILTER -1 R $id config rewrite } }