RESP3: Use new aggregate reply API in cluster.c.
This commit is contained in:
parent
c7f80e4f1a
commit
dcbd40cea4
@ -4126,7 +4126,7 @@ void clusterReplyMultiBulkSlots(client *c) {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
int num_masters = 0;
|
int num_masters = 0;
|
||||||
void *slot_replylen = addDeferredMultiBulkLength(c);
|
void *slot_replylen = addReplyDeferredLen(c);
|
||||||
|
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
|
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
|
||||||
@ -4146,7 +4146,7 @@ void clusterReplyMultiBulkSlots(client *c) {
|
|||||||
}
|
}
|
||||||
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
|
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
|
||||||
int nested_elements = 3; /* slots (2) + master addr (1). */
|
int nested_elements = 3; /* slots (2) + master addr (1). */
|
||||||
void *nested_replylen = addDeferredMultiBulkLength(c);
|
void *nested_replylen = addReplyDeferredLen(c);
|
||||||
|
|
||||||
if (bit && j == CLUSTER_SLOTS-1) j++;
|
if (bit && j == CLUSTER_SLOTS-1) j++;
|
||||||
|
|
||||||
@ -4162,7 +4162,7 @@ void clusterReplyMultiBulkSlots(client *c) {
|
|||||||
start = -1;
|
start = -1;
|
||||||
|
|
||||||
/* First node reply position is always the master */
|
/* First node reply position is always the master */
|
||||||
addReplyMultiBulkLen(c, 3);
|
addReplyArrayLen(c, 3);
|
||||||
addReplyBulkCString(c, node->ip);
|
addReplyBulkCString(c, node->ip);
|
||||||
addReplyLongLong(c, node->port);
|
addReplyLongLong(c, node->port);
|
||||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||||
@ -4172,19 +4172,19 @@ void clusterReplyMultiBulkSlots(client *c) {
|
|||||||
/* This loop is copy/pasted from clusterGenNodeDescription()
|
/* This loop is copy/pasted from clusterGenNodeDescription()
|
||||||
* with modifications for per-slot node aggregation */
|
* with modifications for per-slot node aggregation */
|
||||||
if (nodeFailed(node->slaves[i])) continue;
|
if (nodeFailed(node->slaves[i])) continue;
|
||||||
addReplyMultiBulkLen(c, 3);
|
addReplyArrayLen(c, 3);
|
||||||
addReplyBulkCString(c, node->slaves[i]->ip);
|
addReplyBulkCString(c, node->slaves[i]->ip);
|
||||||
addReplyLongLong(c, node->slaves[i]->port);
|
addReplyLongLong(c, node->slaves[i]->port);
|
||||||
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
|
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
|
||||||
nested_elements++;
|
nested_elements++;
|
||||||
}
|
}
|
||||||
setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
|
setDeferredArrayLen(c, nested_replylen, nested_elements);
|
||||||
num_masters++;
|
num_masters++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
setDeferredMultiBulkLength(c, slot_replylen, num_masters);
|
setDeferredArrayLen(c, slot_replylen, num_masters);
|
||||||
}
|
}
|
||||||
|
|
||||||
void clusterCommand(client *c) {
|
void clusterCommand(client *c) {
|
||||||
@ -4548,7 +4548,7 @@ NULL
|
|||||||
|
|
||||||
keys = zmalloc(sizeof(robj*)*maxkeys);
|
keys = zmalloc(sizeof(robj*)*maxkeys);
|
||||||
numkeys = getKeysInSlot(slot, keys, maxkeys);
|
numkeys = getKeysInSlot(slot, keys, maxkeys);
|
||||||
addReplyMultiBulkLen(c,numkeys);
|
addReplyArrayLen(c,numkeys);
|
||||||
for (j = 0; j < numkeys; j++) {
|
for (j = 0; j < numkeys; j++) {
|
||||||
addReplyBulk(c,keys[j]);
|
addReplyBulk(c,keys[j]);
|
||||||
decrRefCount(keys[j]);
|
decrRefCount(keys[j]);
|
||||||
@ -4627,7 +4627,7 @@ NULL
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
addReplyMultiBulkLen(c,n->numslaves);
|
addReplyArrayLen(c,n->numslaves);
|
||||||
for (j = 0; j < n->numslaves; j++) {
|
for (j = 0; j < n->numslaves; j++) {
|
||||||
sds ni = clusterGenNodeDescription(n->slaves[j]);
|
sds ni = clusterGenNodeDescription(n->slaves[j]);
|
||||||
addReplyBulkCString(c,ni);
|
addReplyBulkCString(c,ni);
|
||||||
|
@ -343,7 +343,7 @@ NULL
|
|||||||
long mblen = 0;
|
long mblen = 0;
|
||||||
void *replylen;
|
void *replylen;
|
||||||
|
|
||||||
replylen = addDeferredMultiBulkLength(c);
|
replylen = addReplyDeferredLen(c);
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
robj *cobj = dictGetKey(de);
|
robj *cobj = dictGetKey(de);
|
||||||
sds channel = cobj->ptr;
|
sds channel = cobj->ptr;
|
||||||
@ -356,12 +356,12 @@ NULL
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
setDeferredMultiBulkLength(c,replylen,mblen);
|
setDeferredArrayLen(c,replylen,mblen);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
|
||||||
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
|
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
addReplyMultiBulkLen(c,(c->argc-2)*2);
|
addReplyArrayLen(c,(c->argc-2)*2);
|
||||||
for (j = 2; j < c->argc; j++) {
|
for (j = 2; j < c->argc; j++) {
|
||||||
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
|
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
|
||||||
|
|
||||||
|
@ -505,7 +505,7 @@ void sortCommand(client *c) {
|
|||||||
addReplyError(c,"One or more scores can't be converted into double");
|
addReplyError(c,"One or more scores can't be converted into double");
|
||||||
} else if (storekey == NULL) {
|
} else if (storekey == NULL) {
|
||||||
/* STORE option not specified, sent the sorting result to client */
|
/* STORE option not specified, sent the sorting result to client */
|
||||||
addReplyMultiBulkLen(c,outputlen);
|
addReplyArrayLen(c,outputlen);
|
||||||
for (j = start; j <= end; j++) {
|
for (j = start; j <= end; j++) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
|
@ -421,7 +421,7 @@ void lrangeCommand(client *c) {
|
|||||||
rangelen = (end-start)+1;
|
rangelen = (end-start)+1;
|
||||||
|
|
||||||
/* Return the result in form of a multi-bulk reply */
|
/* Return the result in form of a multi-bulk reply */
|
||||||
addReplyMultiBulkLen(c,rangelen);
|
addReplyArrayLen(c,rangelen);
|
||||||
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
|
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||||
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
|
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
|
||||||
|
|
||||||
@ -639,10 +639,10 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
|||||||
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
|
|
||||||
/* BRPOP/BLPOP */
|
/* BRPOP/BLPOP */
|
||||||
addReplyMultiBulkLen(receiver,2);
|
addReplyArrayLen(receiver,2);
|
||||||
addReplyBulk(receiver,key);
|
addReplyBulk(receiver,key);
|
||||||
addReplyBulk(receiver,value);
|
addReplyBulk(receiver,value);
|
||||||
|
|
||||||
/* Notify event. */
|
/* Notify event. */
|
||||||
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
||||||
notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
|
notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
|
||||||
@ -704,7 +704,7 @@ void blockingPopGenericCommand(client *c, int where) {
|
|||||||
robj *value = listTypePop(o,where);
|
robj *value = listTypePop(o,where);
|
||||||
serverAssert(value != NULL);
|
serverAssert(value != NULL);
|
||||||
|
|
||||||
addReplyMultiBulkLen(c,2);
|
addReplyArrayLen(c,2);
|
||||||
addReplyBulk(c,c->argv[j]);
|
addReplyBulk(c,c->argv[j]);
|
||||||
addReplyBulk(c,value);
|
addReplyBulk(c,value);
|
||||||
decrRefCount(value);
|
decrRefCount(value);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user