33 #include "endianconv.h"
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
100 "Loading the cluster node config from %s: %s",
108 if (
fstat(fileno(fp),&
sb) != -1 &&
sb.st_size == 0) {
120 maxline = 1024+CLUSTER_SLOTS*128;
121 line = zmalloc(maxline);
122 while(fgets(
line,maxline,fp) !=
NULL) {
125 clusterNode *
n, *master;
131 if (
line[0] ==
'\n' ||
line[0] ==
'\0')
continue;
139 if (strcasecmp(
argv[0],
"vars") == 0) {
140 for (j = 1; j < argc; j += 2) {
141 if (strcasecmp(
argv[j],
"currentEpoch") == 0) {
142 server.cluster->currentEpoch =
144 }
else if (strcasecmp(
argv[j],
"lastVoteEpoch") == 0) {
145 server.cluster->lastVoteEpoch =
148 serverLog(LL_WARNING,
149 "Skipping unknown cluster config variable '%s'",
153 sdsfreesplitres(
argv,argc);
158 if (argc < 8)
goto fmterr;
167 if ((
p = strrchr(
argv[1],
':')) ==
NULL)
goto fmterr;
171 char *busp = strchr(port,
'@');
176 n->port = atoi(port);
180 n->cport = busp ? atoi(busp) :
n->port + CLUSTER_PORT_INCR;
187 if (!strcasecmp(
s,
"myself")) {
188 serverAssert(server.cluster->myself ==
NULL);
189 myself = server.cluster->myself =
n;
190 n->flags |= CLUSTER_NODE_MYSELF;
191 }
else if (!strcasecmp(
s,
"master")) {
192 n->flags |= CLUSTER_NODE_MASTER;
193 }
else if (!strcasecmp(
s,
"slave")) {
194 n->flags |= CLUSTER_NODE_SLAVE;
195 }
else if (!strcasecmp(
s,
"fail?")) {
196 n->flags |= CLUSTER_NODE_PFAIL;
197 }
else if (!strcasecmp(
s,
"fail")) {
198 n->flags |= CLUSTER_NODE_FAIL;
199 n->fail_time = mstime();
200 }
else if (!strcasecmp(
s,
"handshake")) {
201 n->flags |= CLUSTER_NODE_HANDSHAKE;
202 }
else if (!strcasecmp(
s,
"noaddr")) {
203 n->flags |= CLUSTER_NODE_NOADDR;
204 }
else if (!strcasecmp(
s,
"noflags")) {
207 serverPanic(
"Unknown flag in redis cluster config file");
214 if (
argv[3][0] !=
'-') {
225 if (atoi(
argv[4]))
n->ping_sent = mstime();
226 if (atoi(
argv[5]))
n->pong_received = mstime();
229 n->configEpoch = strtoull(
argv[6],
NULL,10);
232 for (j = 8; j < argc; j++) {
235 if (
argv[j][0] ==
'[') {
241 p = strchr(
argv[j],
'-');
242 serverAssert(
p !=
NULL);
245 slot = atoi(
argv[j]+1);
252 if (direction ==
'>') {
253 server.cluster->migrating_slots_to[slot] = cn;
255 server.cluster->importing_slots_from[slot] = cn;
258 }
else if ((
p = strchr(
argv[j],
'-')) !=
NULL) {
268 sdsfreesplitres(
argv,argc);
271 if (server.cluster->myself ==
NULL)
goto fmterr;
276 serverLog(LL_NOTICE,
"Node configuration loaded, I'm %.40s",
myself->name);
287 serverLog(LL_WARNING,
288 "Unrecoverable error: corrupted cluster config file.");
312 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
317 ci = sdscatprintf(ci,
"vars currentEpoch %llu lastVoteEpoch %llu\n",
318 (
unsigned long long) server.cluster->currentEpoch,
319 (
unsigned long long) server.cluster->lastVoteEpoch);
320 content_size = sdslen(ci);
327 if (
sb.st_size > (
off_t)content_size) {
328 ci = sdsgrowzero(ci,
sb.st_size);
329 memset(ci+content_size,
'\n',
sb.st_size-content_size);
334 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
340 if (content_size != sdslen(ci) &&
ftruncate(
fd,content_size) == -1) {
355 serverLog(LL_WARNING,
"Fatal: can't update cluster config file.");
380 serverLog(LL_WARNING,
381 "Can't open %s in order to acquire a lock: %s",
387 if (errno == EWOULDBLOCK) {
388 serverLog(LL_WARNING,
389 "Sorry, the cluster configuration file %s is already used "
390 "by a different Redis Cluster node. Please make sure that "
391 "different nodes use different cluster configuration "
394 serverLog(LL_WARNING,
395 "Impossible to lock %s: %s",
filename, strerror(errno));
410 server.cluster = zmalloc(
sizeof(clusterState));
411 server.cluster->myself =
NULL;
412 server.cluster->currentEpoch = 0;
413 server.cluster->state = CLUSTER_FAIL;
414 server.cluster->size = 1;
415 server.cluster->todo_before_sleep = 0;
416 server.cluster->nodes = dictCreate(&clusterNodesDictType,
NULL);
417 server.cluster->nodes_black_list =
418 dictCreate(&clusterNodesBlackListDictType,
NULL);
419 server.cluster->failover_auth_time = 0;
420 server.cluster->failover_auth_count = 0;
421 server.cluster->failover_auth_rank = 0;
422 server.cluster->failover_auth_epoch = 0;
423 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
424 server.cluster->lastVoteEpoch = 0;
425 for (
int i = 0;
i < CLUSTERMSG_TYPE_COUNT;
i++) {
426 server.cluster->stats_bus_messages_sent[
i] = 0;
427 server.cluster->stats_bus_messages_received[
i] = 0;
429 server.cluster->stats_pfail_nodes = 0;
430 memset(server.cluster->slots,0,
sizeof(server.cluster->slots));
442 myself = server.cluster->myself =
444 serverLog(LL_NOTICE,
"No cluster configuration found, I'm %.40s",
452 server.cfd_count = 0;
457 if (server.port > (65535-CLUSTER_PORT_INCR)) {
458 serverLog(LL_WARNING,
"Redis port number too high. "
459 "Cluster communication port is 10,000 port "
460 "numbers higher than your Redis port. "
461 "Your Redis port number must be "
462 "lower than 55535.");
466 if (listenToPort(server.port+CLUSTER_PORT_INCR,
467 server.cfd,&server.cfd_count) == C_ERR)
473 for (j = 0; j < server.cfd_count; j++) {
474 if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
476 serverPanic(
"Unrecoverable error creating Redis Cluster "
482 server.cluster->slots_to_keys = raxNew();
483 memset(server.cluster->slots_keys_count,0,
484 sizeof(server.cluster->slots_keys_count));
488 myself->port = server.port;
489 myself->cport = server.port+CLUSTER_PORT_INCR;
490 if (server.cluster_announce_port)
491 myself->port = server.cluster_announce_port;
492 if (server.cluster_announce_bus_port)
493 myself->cport = server.cluster_announce_bus_port;
495 server.cluster->mf_end = 0;
514 if (nodeIsSlave(
myself)) {
516 replicationUnsetMaster();
517 emptyDb(-1,EMPTYDB_NO_FLAGS,
NULL);
528 di = dictGetSafeIterator(server.cluster->nodes);
529 while((de = dictNext(di)) !=
NULL) {
530 clusterNode *node = dictGetVal(de);
532 if (node ==
myself)
continue;
535 dictReleaseIterator(di);
541 server.cluster->currentEpoch = 0;
542 server.cluster->lastVoteEpoch = 0;
544 serverLog(LL_WARNING,
"configEpoch set to 0 via CLUSTER RESET HARD");
548 oldname = sdsnewlen(
myself->name, CLUSTER_NAMELEN);
549 dictDelete(server.cluster->nodes,oldname);
551 getRandomHexChars(
myself->name, CLUSTER_NAMELEN);
553 serverLog(LL_NOTICE,
"Node hard reset, now I'm %.40s",
myself->name);
558 CLUSTER_TODO_UPDATE_STATE|
559 CLUSTER_TODO_FSYNC_CONFIG);
567 clusterLink *
link = zmalloc(
sizeof(*
link));
568 link->ctime = mstime();
569 link->sndbuf = sdsempty();
570 link->rcvbuf = sdsempty();
580 if (
link->fd != -1) {
581 aeDeleteFileEvent(server.el,
link->fd, AE_WRITABLE);
582 aeDeleteFileEvent(server.el,
link->fd, AE_READABLE);
584 sdsfree(
link->sndbuf);
585 sdsfree(
link->rcvbuf);
592 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
596 char cip[NET_IP_STR_LEN];
604 if (server.masterhost ==
NULL && server.loading)
return;
607 cfd = anetTcpAccept(server.neterr,
fd, cip,
sizeof(cip), &cport);
608 if (cfd == ANET_ERR) {
609 if (errno != EWOULDBLOCK)
610 serverLog(LL_VERBOSE,
611 "Error accepting cluster node: %s", server.neterr);
614 anetNonBlock(
NULL,cfd);
615 anetEnableTcpNoDelay(
NULL,cfd);
618 serverLog(LL_VERBOSE,
"Accepted cluster node %s:%d", cip, cport);
643 for (
s = 0;
s < keylen;
s++)
644 if (
key[
s] ==
'{')
break;
647 if (
s == keylen)
return crc16(
key,keylen) & 0x3FFF;
650 for (
e =
s+1;
e < keylen;
e++)
651 if (
key[
e] ==
'}')
break;
654 if (
e == keylen ||
e ==
s+1)
return crc16(
key,keylen) & 0x3FFF;
658 return crc16(
key+
s+1,
e-
s-1) & 0x3FFF;
673 clusterNode *node = zmalloc(
sizeof(*node));
676 memcpy(node->name, nodename, CLUSTER_NAMELEN);
678 getRandomHexChars(node->name, CLUSTER_NAMELEN);
679 node->ctime = mstime();
680 node->configEpoch = 0;
682 memset(node->slots,0,
sizeof(node->slots));
686 node->slaveof =
NULL;
687 node->ping_sent = node->pong_received = 0;
690 memset(node->ip,0,
sizeof(node->ip));
693 node->fail_reports = listCreate();
694 node->voted_time = 0;
695 node->orphaned_time = 0;
696 node->repl_offset_time = 0;
697 node->repl_offset = 0;
698 listSetFreeMethod(node->fail_reports,zfree);
713 list *l = failing->fail_reports;
716 clusterNodeFailReport *fr;
721 while ((ln = listNext(&li)) !=
NULL) {
723 if (fr->node == sender) {
730 fr = zmalloc(
sizeof(*fr));
733 listAddNodeTail(l,fr);
743 list *l = node->fail_reports;
746 clusterNodeFailReport *fr;
747 mstime_t maxtime = server.cluster_node_timeout *
748 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
749 mstime_t now = mstime();
752 while ((ln = listNext(&li)) !=
NULL) {
754 if (now - fr->time > maxtime) listDelNode(l,ln);
770 list *l = node->fail_reports;
773 clusterNodeFailReport *fr;
777 while ((ln = listNext(&li)) !=
NULL) {
779 if (fr->node == sender)
break;
794 return listLength(node->fail_reports);
800 for (j = 0; j < master->numslaves; j++) {
801 if (master->slaves[j] == slave) {
802 if ((j+1) < master->numslaves) {
803 int remaining_slaves = (master->numslaves - j) - 1;
804 memmove(master->slaves+j,master->slaves+(j+1),
805 (
sizeof(*master->slaves) * remaining_slaves));
808 if (master->numslaves == 0)
809 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
820 for (j = 0; j < master->numslaves; j++)
821 if (master->slaves[j] == slave)
return C_ERR;
822 master->slaves = zrealloc(master->slaves,
823 sizeof(clusterNode*)*(master->numslaves+1));
824 master->slaves[master->numslaves] = slave;
826 master->flags |= CLUSTER_NODE_MIGRATE_TO;
833 for (j = 0; j <
n->numslaves; j++)
834 if (!nodeFailed(
n->slaves[j])) okslaves++;
845 for (j = 0; j <
n->numslaves; j++)
846 n->slaves[j]->slaveof =
NULL;
852 nodename = sdsnewlen(
n->name, CLUSTER_NAMELEN);
853 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
858 listRelease(
n->fail_reports);
867 retval = dictAdd(server.cluster->nodes,
868 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
869 return (retval == DICT_OK) ? C_OK : C_ERR;
889 for (j = 0; j < CLUSTER_SLOTS; j++) {
890 if (server.cluster->importing_slots_from[j] == delnode)
891 server.cluster->importing_slots_from[j] =
NULL;
892 if (server.cluster->migrating_slots_to[j] == delnode)
893 server.cluster->migrating_slots_to[j] =
NULL;
894 if (server.cluster->slots[j] == delnode)
899 di = dictGetSafeIterator(server.cluster->nodes);
900 while((de = dictNext(di)) !=
NULL) {
901 clusterNode *node = dictGetVal(de);
903 if (node == delnode)
continue;
906 dictReleaseIterator(di);
914 sds
s = sdsnewlen(
name, CLUSTER_NAMELEN);
917 de = dictFind(server.cluster->nodes,
s);
920 return dictGetVal(de);
929 sds
s = sdsnewlen(node->name, CLUSTER_NAMELEN);
931 serverLog(LL_DEBUG,
"Renaming node %.40s into %.40s",
932 node->name, newname);
933 retval = dictDelete(server.cluster->nodes,
s);
935 serverAssert(retval == DICT_OK);
936 memcpy(node->name, newname, CLUSTER_NAMELEN);
951 di = dictGetSafeIterator(server.cluster->nodes);
952 while((de = dictNext(di)) !=
NULL) {
953 clusterNode *node = dictGetVal(de);
954 if (node->configEpoch >
max)
max = node->configEpoch;
956 dictReleaseIterator(di);
957 if (max < server.cluster->currentEpoch)
max = server.cluster->currentEpoch;
993 if (
myself->configEpoch == 0 ||
994 myself->configEpoch != maxEpoch)
996 server.cluster->currentEpoch++;
997 myself->configEpoch = server.cluster->currentEpoch;
999 CLUSTER_TODO_FSYNC_CONFIG);
1000 serverLog(LL_WARNING,
1001 "New configEpoch set to %llu",
1002 (
unsigned long long)
myself->configEpoch);
1057 if (sender->configEpoch !=
myself->configEpoch ||
1058 !nodeIsMaster(sender) || !nodeIsMaster(
myself))
return;
1060 if (memcmp(sender->name,
myself->name,CLUSTER_NAMELEN) <= 0)
return;
1062 server.cluster->currentEpoch++;
1063 myself->configEpoch = server.cluster->currentEpoch;
1065 serverLog(LL_VERBOSE,
1066 "WARNING: configEpoch collision with node %.40s."
1067 " configEpoch set to %llu",
1069 (
unsigned long long)
myself->configEpoch);
1094 #define CLUSTER_BLACKLIST_TTL 60
1107 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1108 while((de = dictNext(di)) !=
NULL) {
1109 int64_t expire = dictGetUnsignedIntegerVal(de);
1111 if (expire < server.unixtime)
1112 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1114 dictReleaseIterator(di);
1120 sds
id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1123 if (dictAdd(server.cluster->nodes_black_list,
id,
NULL) == DICT_OK) {
1128 de = dictFind(server.cluster->nodes_black_list,
id);
1137 sds
id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1141 retval = dictFind(server.cluster->nodes_black_list,
id) !=
NULL;
1173 int needed_quorum = (server.cluster->size / 2) + 1;
1175 if (!nodeTimedOut(node))
return;
1176 if (nodeFailed(node))
return;
1180 if (nodeIsMaster(
myself)) failures++;
1181 if (failures < needed_quorum)
return;
1183 serverLog(LL_NOTICE,
1184 "Marking node %.40s as failing (quorum reached).", node->name);
1187 node->flags &= ~CLUSTER_NODE_PFAIL;
1188 node->flags |= CLUSTER_NODE_FAIL;
1189 node->fail_time = mstime();
1201 mstime_t now = mstime();
1203 serverAssert(nodeFailed(node));
1207 if (nodeIsSlave(node) || node->numslots == 0) {
1208 serverLog(LL_NOTICE,
1209 "Clear FAIL state for node %.40s: %s is reachable again.",
1211 nodeIsSlave(node) ?
"slave" :
"master without slots");
1212 node->flags &= ~CLUSTER_NODE_FAIL;
1220 if (nodeIsMaster(node) && node->numslots > 0 &&
1221 (now - node->fail_time) >
1222 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1224 serverLog(LL_NOTICE,
1225 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1227 node->flags &= ~CLUSTER_NODE_FAIL;
1239 di = dictGetSafeIterator(server.cluster->nodes);
1240 while((de = dictNext(di)) !=
NULL) {
1241 clusterNode *node = dictGetVal(de);
1243 if (!nodeInHandshake(node))
continue;
1244 if (!strcasecmp(node->ip,
ip) &&
1245 node->port == port &&
1246 node->cport == cport)
break;
1248 dictReleaseIterator(di);
1261 char norm_ip[NET_IP_STR_LEN];
1262 struct sockaddr_storage sa;
1279 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1286 memset(norm_ip,0,NET_IP_STR_LEN);
1290 norm_ip,NET_IP_STR_LEN);
1294 norm_ip,NET_IP_STR_LEN);
1305 memcpy(
n->ip,norm_ip,
sizeof(
n->ip));
1318 clusterMsgDataGossip *
g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1327 serverLog(LL_DEBUG,
"GOSSIP %.40s %s:%d@%d %s",
1340 if (sender && nodeIsMaster(sender) && node !=
myself) {
1341 if (
flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1343 serverLog(LL_VERBOSE,
1344 "Node %.40s reported node %.40s as not reachable.",
1345 sender->name, node->name);
1350 serverLog(LL_VERBOSE,
1351 "Node %.40s reported node %.40s is back online.",
1352 sender->name, node->name);
1361 if (!(
flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1362 node->ping_sent == 0 &&
1365 mstime_t pongtime = ntohl(
g->pong_received);
1372 if (pongtime <= (server.mstime+500) &&
1373 pongtime > node->pong_received)
1375 node->pong_received = pongtime;
1384 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1385 !(
flags & CLUSTER_NODE_NOADDR) &&
1386 !(
flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1387 (strcasecmp(node->ip,
g->ip) ||
1388 node->port !=
ntohs(
g->port) ||
1389 node->cport !=
ntohs(
g->cport)))
1392 memcpy(node->ip,
g->ip,NET_IP_STR_LEN);
1393 node->port =
ntohs(
g->port);
1394 node->cport =
ntohs(
g->cport);
1395 node->flags &= ~CLUSTER_NODE_NOADDR;
1405 !(
flags & CLUSTER_NODE_NOADDR) &&
1421 if (announced_ip[0] !=
'\0') {
1422 memcpy(
buf,announced_ip,NET_IP_STR_LEN);
1423 buf[NET_IP_STR_LEN-1] =
'\0';
1425 anetPeerToString(
link->fd,
buf, NET_IP_STR_LEN,
NULL);
1444 char ip[NET_IP_STR_LEN] = {0};
1445 int port =
ntohs(hdr->port);
1446 int cport =
ntohs(hdr->cport);
1454 if (
link == node->link)
return 0;
1457 if (node->port == port && node->cport == cport &&
1458 strcmp(
ip,node->ip) == 0)
return 0;
1463 node->cport = cport;
1465 node->flags &= ~CLUSTER_NODE_NOADDR;
1466 serverLog(LL_WARNING,
"Address updated for node %.40s, now %s:%d",
1467 node->name, node->ip, node->port);
1472 replicationSetMaster(node->ip, node->port);
1480 if (nodeIsMaster(
n))
return;
1484 if (
n !=
myself)
n->flags |= CLUSTER_NODE_MIGRATE_TO;
1486 n->flags &= ~CLUSTER_NODE_SLAVE;
1487 n->flags |= CLUSTER_NODE_MASTER;
1492 CLUSTER_TODO_UPDATE_STATE);
1508 clusterNode *curmaster, *newmaster =
NULL;
1516 uint16_t dirty_slots[CLUSTER_SLOTS];
1517 int dirty_slots_count = 0;
1525 serverLog(LL_WARNING,
"Discarding UPDATE message about myself.");
1529 for (j = 0; j < CLUSTER_SLOTS; j++) {
1532 if (server.cluster->slots[j] == sender)
continue;
1538 if (server.cluster->importing_slots_from[j])
continue;
1544 if (server.cluster->slots[j] ==
NULL ||
1545 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1549 if (server.cluster->slots[j] ==
myself &&
1550 countKeysInSlot(j) &&
1553 dirty_slots[dirty_slots_count] = j;
1554 dirty_slots_count++;
1557 if (server.cluster->slots[j] == curmaster)
1562 CLUSTER_TODO_UPDATE_STATE|
1563 CLUSTER_TODO_FSYNC_CONFIG);
1575 if (newmaster && curmaster->numslots == 0) {
1576 serverLog(LL_WARNING,
1577 "Configuration change detected. Reconfiguring myself "
1578 "as a replica of %.40s", sender->name);
1581 CLUSTER_TODO_UPDATE_STATE|
1582 CLUSTER_TODO_FSYNC_CONFIG);
1583 }
else if (dirty_slots_count) {
1591 for (j = 0; j < dirty_slots_count; j++)
1592 delKeysInSlot(dirty_slots[j]);
1606 clusterMsg *hdr = (clusterMsg*)
link->rcvbuf;
1607 uint32_t totlen = ntohl(hdr->totlen);
1610 if (
type < CLUSTERMSG_TYPE_COUNT)
1611 server.cluster->stats_bus_messages_received[
type]++;
1612 serverLog(LL_DEBUG,
"--- Processing packet of type %d, %lu bytes",
1613 type, (
unsigned long) totlen);
1616 if (totlen < 16)
return 1;
1617 if (totlen > sdslen(
link->rcvbuf))
return 1;
1619 if (
ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1625 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1626 clusterNode *sender;
1628 if (
type == CLUSTERMSG_TYPE_PING ||
type == CLUSTERMSG_TYPE_PONG ||
1629 type == CLUSTERMSG_TYPE_MEET)
1634 explen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
1635 explen += (
sizeof(clusterMsgDataGossip)*
count);
1636 if (totlen != explen)
return 1;
1637 }
else if (
type == CLUSTERMSG_TYPE_FAIL) {
1638 uint32_t explen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
1640 explen +=
sizeof(clusterMsgDataFail);
1641 if (totlen != explen)
return 1;
1642 }
else if (
type == CLUSTERMSG_TYPE_PUBLISH) {
1643 uint32_t explen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
1645 explen +=
sizeof(clusterMsgDataPublish) -
1647 ntohl(hdr->data.publish.msg.channel_len) +
1648 ntohl(hdr->data.publish.msg.message_len);
1649 if (totlen != explen)
return 1;
1650 }
else if (
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1651 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1652 type == CLUSTERMSG_TYPE_MFSTART)
1654 uint32_t explen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
1656 if (totlen != explen)
return 1;
1657 }
else if (
type == CLUSTERMSG_TYPE_UPDATE) {
1658 uint32_t explen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
1660 explen +=
sizeof(clusterMsgDataUpdate);
1661 if (totlen != explen)
return 1;
1666 if (sender && !nodeInHandshake(sender)) {
1668 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1669 senderConfigEpoch = ntohu64(hdr->configEpoch);
1670 if (senderCurrentEpoch > server.cluster->currentEpoch)
1671 server.cluster->currentEpoch = senderCurrentEpoch;
1673 if (senderConfigEpoch > sender->configEpoch) {
1674 sender->configEpoch = senderConfigEpoch;
1676 CLUSTER_TODO_FSYNC_CONFIG);
1679 sender->repl_offset = ntohu64(hdr->offset);
1680 sender->repl_offset_time = mstime();
1683 if (server.cluster->mf_end &&
1685 myself->slaveof == sender &&
1686 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1687 server.cluster->mf_master_offset == 0)
1689 server.cluster->mf_master_offset = sender->repl_offset;
1690 serverLog(LL_WARNING,
1691 "Received replication offset for paused "
1692 "master manual failover: %lld",
1693 server.cluster->mf_master_offset);
1698 if (
type == CLUSTERMSG_TYPE_PING ||
type == CLUSTERMSG_TYPE_MEET) {
1699 serverLog(LL_DEBUG,
"Ping packet received: %p", (
void*)
link->node);
1712 if ((
type == CLUSTERMSG_TYPE_MEET ||
myself->ip[0] ==
'\0') &&
1713 server.cluster_announce_ip ==
NULL)
1715 char ip[NET_IP_STR_LEN];
1721 serverLog(LL_WARNING,
"IP address for this node updated to %s",
1731 if (!sender &&
type == CLUSTERMSG_TYPE_MEET) {
1736 node->port =
ntohs(hdr->port);
1737 node->cport =
ntohs(hdr->cport);
1745 if (!sender &&
type == CLUSTERMSG_TYPE_MEET)
1753 if (
type == CLUSTERMSG_TYPE_PING ||
type == CLUSTERMSG_TYPE_PONG ||
1754 type == CLUSTERMSG_TYPE_MEET)
1756 serverLog(LL_DEBUG,
"%s packet received: %p",
1757 type == CLUSTERMSG_TYPE_PING ?
"ping" :
"pong",
1760 if (nodeInHandshake(
link->node)) {
1764 serverLog(LL_VERBOSE,
1765 "Handshake: we already know node %.40s, "
1766 "updating the address if needed.", sender->name);
1770 CLUSTER_TODO_UPDATE_STATE);
1781 serverLog(LL_DEBUG,
"Handshake with node %.40s completed.",
1783 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1784 link->node->flags |=
flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1786 }
else if (memcmp(
link->node->name,hdr->sender,
1787 CLUSTER_NAMELEN) != 0)
1792 serverLog(LL_DEBUG,
"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1794 (
int)(mstime()-(
link->node->ctime)),
1796 link->node->flags |= CLUSTER_NODE_NOADDR;
1797 link->node->ip[0] =
'\0';
1798 link->node->port = 0;
1799 link->node->cport = 0;
1807 if (sender &&
type == CLUSTERMSG_TYPE_PING &&
1808 !nodeInHandshake(sender) &&
1812 CLUSTER_TODO_UPDATE_STATE);
1816 if (
link->node &&
type == CLUSTERMSG_TYPE_PONG) {
1817 link->node->pong_received = mstime();
1818 link->node->ping_sent = 0;
1826 if (nodeTimedOut(
link->node)) {
1827 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1829 CLUSTER_TODO_UPDATE_STATE);
1830 }
else if (nodeFailed(
link->node)) {
1837 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1838 sizeof(hdr->slaveof)))
1846 if (nodeIsMaster(sender)) {
1849 sender->flags &= ~(CLUSTER_NODE_MASTER|
1850 CLUSTER_NODE_MIGRATE_TO);
1851 sender->flags |= CLUSTER_NODE_SLAVE;
1855 CLUSTER_TODO_UPDATE_STATE);
1859 if (master && sender->slaveof != master) {
1860 if (sender->slaveof)
1863 sender->slaveof = master;
1880 clusterNode *sender_master =
NULL;
1881 int dirty_slots = 0;
1884 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1885 if (sender_master) {
1886 dirty_slots = memcmp(sender_master->slots,
1887 hdr->myslots,
sizeof(hdr->myslots)) != 0;
1894 if (sender && nodeIsMaster(sender) && dirty_slots)
1915 if (sender && dirty_slots) {
1918 for (j = 0; j < CLUSTER_SLOTS; j++) {
1920 if (server.cluster->slots[j] == sender ||
1921 server.cluster->slots[j] ==
NULL)
continue;
1922 if (server.cluster->slots[j]->configEpoch >
1925 serverLog(LL_VERBOSE,
1926 "Node %.40s has old slots configuration, sending "
1927 "an UPDATE message about %.40s",
1928 sender->name, server.cluster->slots[j]->name);
1930 server.cluster->slots[j]);
1944 nodeIsMaster(
myself) && nodeIsMaster(sender) &&
1945 senderConfigEpoch ==
myself->configEpoch)
1952 }
else if (
type == CLUSTERMSG_TYPE_FAIL) {
1953 clusterNode *failing;
1958 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
1960 serverLog(LL_NOTICE,
1961 "FAIL message received from %.40s about %.40s",
1962 hdr->sender, hdr->data.fail.about.nodename);
1963 failing->flags |= CLUSTER_NODE_FAIL;
1964 failing->fail_time = mstime();
1965 failing->flags &= ~CLUSTER_NODE_PFAIL;
1967 CLUSTER_TODO_UPDATE_STATE);
1970 serverLog(LL_NOTICE,
1971 "Ignoring FAIL message from unknown node %.40s about %.40s",
1972 hdr->sender, hdr->data.fail.about.nodename);
1974 }
else if (
type == CLUSTERMSG_TYPE_PUBLISH) {
1980 if (dictSize(server.pubsub_channels) ||
1981 listLength(server.pubsub_patterns))
1983 channel_len = ntohl(hdr->data.publish.msg.channel_len);
1984 message_len = ntohl(hdr->data.publish.msg.message_len);
1985 channel = createStringObject(
1986 (
char*)hdr->data.publish.msg.bulk_data,channel_len);
1988 (
char*)hdr->data.publish.msg.bulk_data+channel_len,
1990 pubsubPublishMessage(channel,
message);
1991 decrRefCount(channel);
1994 }
else if (
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
1995 if (!sender)
return 1;
1997 }
else if (
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
1998 if (!sender)
return 1;
2002 if (nodeIsMaster(sender) && sender->numslots > 0 &&
2003 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2005 server.cluster->failover_auth_count++;
2010 }
else if (
type == CLUSTERMSG_TYPE_MFSTART) {
2013 if (!sender || sender->slaveof !=
myself)
return 1;
2017 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
2018 server.cluster->mf_slave = sender;
2019 pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
2020 serverLog(LL_WARNING,
"Manual failover requested by slave %.40s.",
2022 }
else if (
type == CLUSTERMSG_TYPE_UPDATE) {
2025 ntohu64(hdr->data.update.nodecfg.configEpoch);
2027 if (!sender)
return 1;
2030 if (
n->configEpoch >= reportedConfigEpoch)
return 1;
2036 n->configEpoch = reportedConfigEpoch;
2038 CLUSTER_TODO_FSYNC_CONFIG);
2043 hdr->data.update.nodecfg.slots);
2045 serverLog(LL_WARNING,
"Received unknown packet type: %d",
type);
2064 clusterLink *
link = (clusterLink*) privdata;
2070 if (nwritten <= 0) {
2071 serverLog(LL_DEBUG,
"I/O error writing to node link: %s",
2076 sdsrange(
link->sndbuf,nwritten,-1);
2077 if (sdslen(
link->sndbuf) == 0)
2078 aeDeleteFileEvent(server.el,
link->fd, AE_WRITABLE);
2085 char buf[
sizeof(clusterMsg)];
2088 clusterLink *
link = (clusterLink*) privdata;
2089 unsigned int readlen, rcvbuflen;
2094 rcvbuflen = sdslen(
link->rcvbuf);
2095 if (rcvbuflen < 8) {
2098 readlen = 8 - rcvbuflen;
2101 hdr = (clusterMsg*)
link->rcvbuf;
2102 if (rcvbuflen == 8) {
2105 if (memcmp(hdr->sig,
"RCmb",4) != 0 ||
2106 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2108 serverLog(LL_WARNING,
2109 "Bad message length or signature received "
2110 "from Cluster bus.");
2115 readlen = ntohl(hdr->totlen) - rcvbuflen;
2116 if (readlen >
sizeof(
buf)) readlen =
sizeof(
buf);
2120 if (nread == -1 && errno ==
EAGAIN)
return;
2124 serverLog(LL_DEBUG,
"I/O error reading from node link: %s",
2125 (nread == 0) ?
"connection closed" : strerror(errno));
2131 hdr = (clusterMsg*)
link->rcvbuf;
2136 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2138 sdsfree(
link->rcvbuf);
2139 link->rcvbuf = sdsempty();
2153 if (sdslen(
link->sndbuf) == 0 && msglen != 0)
2154 aeCreateFileEvent(server.el,
link->fd,AE_WRITABLE,
2157 link->sndbuf = sdscatlen(
link->sndbuf,
msg, msglen);
2160 clusterMsg *hdr = (clusterMsg*)
msg;
2162 if (
type < CLUSTERMSG_TYPE_COUNT)
2163 server.cluster->stats_bus_messages_sent[
type]++;
2176 di = dictGetSafeIterator(server.cluster->nodes);
2177 while((de = dictNext(di)) !=
NULL) {
2178 clusterNode *node = dictGetVal(de);
2180 if (!node->link)
continue;
2181 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2185 dictReleaseIterator(di);
2193 clusterNode *master;
2202 memset(hdr,0,
sizeof(*hdr));
2203 hdr->ver =
htons(CLUSTER_PROTO_VER);
2214 memset(hdr->myip,0,NET_IP_STR_LEN);
2215 if (server.cluster_announce_ip) {
2216 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2217 hdr->myip[NET_IP_STR_LEN-1] =
'\0';
2221 int announced_port = server.cluster_announce_port ?
2222 server.cluster_announce_port : server.port;
2223 int announced_cport = server.cluster_announce_bus_port ?
2224 server.cluster_announce_bus_port :
2225 (server.port + CLUSTER_PORT_INCR);
2227 memcpy(hdr->myslots,master->slots,
sizeof(hdr->myslots));
2228 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2230 memcpy(hdr->slaveof,
myself->slaveof->name, CLUSTER_NAMELEN);
2231 hdr->port =
htons(announced_port);
2232 hdr->cport =
htons(announced_cport);
2234 hdr->state = server.cluster->state;
2237 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2238 hdr->configEpoch = htonu64(master->configEpoch);
2242 offset = replicationGetSlaveOffset();
2244 offset = server.master_repl_offset;
2245 hdr->offset = htonu64(
offset);
2248 if (nodeIsMaster(
myself) && server.cluster->mf_end)
2249 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2253 if (
type == CLUSTERMSG_TYPE_FAIL) {
2254 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2255 totlen +=
sizeof(clusterMsgDataFail);
2256 }
else if (
type == CLUSTERMSG_TYPE_UPDATE) {
2257 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2258 totlen +=
sizeof(clusterMsgDataUpdate);
2260 hdr->totlen = htonl(totlen);
2269 for (j = 0; j <
count; j++) {
2270 if (memcmp(hdr->data.ping.gossip[j].nodename,
n->name,
2271 CLUSTER_NAMELEN) == 0)
break;
2279 clusterMsgDataGossip *gossip;
2280 gossip = &(hdr->data.ping.gossip[
i]);
2281 memcpy(gossip->nodename,
n->name,CLUSTER_NAMELEN);
2282 gossip->ping_sent = htonl(
n->ping_sent/1000);
2283 gossip->pong_received = htonl(
n->pong_received/1000);
2284 memcpy(gossip->ip,
n->ip,
sizeof(
n->ip));
2285 gossip->port =
htons(
n->port);
2286 gossip->cport =
htons(
n->cport);
2287 gossip->flags =
htons(
n->flags);
2288 gossip->notused1 = 0;
2296 int gossipcount = 0;
2303 int freshnodes = dictSize(server.cluster->nodes)-2;
2331 wanted = floor(dictSize(server.cluster->nodes)/10);
2332 if (wanted < 3) wanted = 3;
2333 if (wanted > freshnodes) wanted = freshnodes;
2337 int pfail_wanted = server.cluster->stats_pfail_nodes;
2342 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2343 totlen += (
sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2346 if (totlen < (
int)
sizeof(clusterMsg)) totlen =
sizeof(clusterMsg);
2348 hdr = (clusterMsg*)
buf;
2351 if (
link->node &&
type == CLUSTERMSG_TYPE_PING)
2352 link->node->ping_sent = mstime();
2356 int maxiterations = wanted*3;
2357 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2358 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2359 clusterNode *
this = dictGetVal(de);
2363 if (
this ==
myself)
continue;
2366 if (this->
flags & CLUSTER_NODE_PFAIL)
continue;
2373 if (this->
flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2374 (this->link ==
NULL && this->numslots == 0))
2394 di = dictGetSafeIterator(server.cluster->nodes);
2395 while((de = dictNext(di)) !=
NULL && pfail_wanted > 0) {
2396 clusterNode *node = dictGetVal(de);
2397 if (node->flags & CLUSTER_NODE_HANDSHAKE)
continue;
2398 if (node->flags & CLUSTER_NODE_NOADDR)
continue;
2399 if (!(node->flags & CLUSTER_NODE_PFAIL))
continue;
2408 dictReleaseIterator(di);
2413 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2414 totlen += (
sizeof(clusterMsgDataGossip)*gossipcount);
2415 hdr->count =
htons(gossipcount);
2416 hdr->totlen = htonl(totlen);
2435 #define CLUSTER_BROADCAST_ALL 0
2436 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
2441 di = dictGetSafeIterator(server.cluster->nodes);
2442 while((de = dictNext(di)) !=
NULL) {
2443 clusterNode *node = dictGetVal(de);
2445 if (!node->link)
continue;
2446 if (node ==
myself || nodeInHandshake(node))
continue;
2449 nodeIsSlave(node) && node->slaveof &&
2450 (node->slaveof ==
myself || node->slaveof ==
myself->slaveof);
2451 if (!local_slave)
continue;
2455 dictReleaseIterator(di);
2462 unsigned char buf[
sizeof(clusterMsg)], *payload;
2463 clusterMsg *hdr = (clusterMsg*)
buf;
2467 channel = getDecodedObject(channel);
2469 channel_len = sdslen(channel->ptr);
2470 message_len = sdslen(
message->ptr);
2473 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2474 totlen +=
sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2476 hdr->data.publish.msg.channel_len = htonl(channel_len);
2477 hdr->data.publish.msg.message_len = htonl(message_len);
2478 hdr->totlen = htonl(totlen);
2481 if (totlen <
sizeof(
buf)) {
2484 payload = zmalloc(totlen);
2485 memcpy(payload,hdr,
sizeof(*hdr));
2486 hdr = (clusterMsg*) payload;
2488 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2489 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2497 decrRefCount(channel);
2499 if (payload !=
buf) zfree(payload);
2508 unsigned char buf[
sizeof(clusterMsg)];
2509 clusterMsg *hdr = (clusterMsg*)
buf;
2512 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2520 unsigned char buf[
sizeof(clusterMsg)];
2521 clusterMsg *hdr = (clusterMsg*)
buf;
2525 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2526 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2527 memcpy(hdr->data.update.nodecfg.slots,node->slots,
sizeof(node->slots));
2553 unsigned char buf[
sizeof(clusterMsg)];
2554 clusterMsg *hdr = (clusterMsg*)
buf;
2561 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2562 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2563 hdr->totlen = htonl(totlen);
2569 unsigned char buf[
sizeof(clusterMsg)];
2570 clusterMsg *hdr = (clusterMsg*)
buf;
2573 if (!node->link)
return;
2575 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2576 hdr->totlen = htonl(totlen);
2582 unsigned char buf[
sizeof(clusterMsg)];
2583 clusterMsg *hdr = (clusterMsg*)
buf;
2586 if (!node->link)
return;
2588 totlen =
sizeof(clusterMsg)-
sizeof(
union clusterMsgData);
2589 hdr->totlen = htonl(totlen);
2595 clusterNode *master = node->slaveof;
2598 unsigned char *claimed_slots =
request->myslots;
2599 int force_ack =
request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2606 if (nodeIsSlave(
myself) ||
myself->numslots == 0)
return;
2612 if (requestCurrentEpoch < server.cluster->currentEpoch) {
2613 serverLog(LL_WARNING,
2614 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2616 (
unsigned long long) requestCurrentEpoch,
2617 (
unsigned long long) server.cluster->currentEpoch);
2622 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2623 serverLog(LL_WARNING,
2624 "Failover auth denied to %.40s: already voted for epoch %llu",
2626 (
unsigned long long) server.cluster->currentEpoch);
2633 if (nodeIsMaster(node) || master ==
NULL ||
2634 (!nodeFailed(master) && !force_ack))
2636 if (nodeIsMaster(node)) {
2637 serverLog(LL_WARNING,
2638 "Failover auth denied to %.40s: it is a master node",
2640 }
else if (master ==
NULL) {
2641 serverLog(LL_WARNING,
2642 "Failover auth denied to %.40s: I don't know its master",
2644 }
else if (!nodeFailed(master)) {
2645 serverLog(LL_WARNING,
2646 "Failover auth denied to %.40s: its master is up",
2655 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2657 serverLog(LL_WARNING,
2658 "Failover auth denied to %.40s: "
2659 "can't vote about this master before %lld milliseconds",
2661 (
long long) ((server.cluster_node_timeout*2)-
2662 (mstime() - node->slaveof->voted_time)));
2669 for (j = 0; j < CLUSTER_SLOTS; j++) {
2671 if (server.cluster->slots[j] ==
NULL ||
2672 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2679 serverLog(LL_WARNING,
2680 "Failover auth denied to %.40s: "
2681 "slot %d epoch (%llu) > reqEpoch (%llu)",
2683 (
unsigned long long) server.cluster->slots[j]->configEpoch,
2684 (
unsigned long long) requestConfigEpoch);
2690 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2691 node->slaveof->voted_time = mstime();
2692 serverLog(LL_WARNING,
"Failover auth granted to %.40s for epoch %llu",
2693 node->name, (
unsigned long long) server.cluster->currentEpoch);
2711 clusterNode *master;
2713 serverAssert(nodeIsSlave(
myself));
2714 master =
myself->slaveof;
2715 if (master ==
NULL)
return 0;
2717 myoffset = replicationGetSlaveOffset();
2718 for (j = 0; j < master->numslaves; j++)
2719 if (master->slaves[j] !=
myself &&
2720 master->slaves[j]->repl_offset > myoffset) rank++;
2748 static time_t lastlog_time = 0;
2749 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2752 if (reason == server.cluster->cant_failover_reason &&
2753 time(
NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2756 server.cluster->cant_failover_reason = reason;
2762 nodeFailed(
myself->slaveof) &&
2763 (mstime() -
myself->slaveof->fail_time) < nolog_fail_time)
return;
2766 case CLUSTER_CANT_FAILOVER_DATA_AGE:
2767 msg =
"Disconnected from master for longer than allowed. "
2768 "Please check the 'cluster-slave-validity-factor' configuration "
2771 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2772 msg =
"Waiting the delay before I can start a new failover.";
2774 case CLUSTER_CANT_FAILOVER_EXPIRED:
2775 msg =
"Failover attempt expired.";
2777 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2778 msg =
"Waiting for votes, but majority still not reached.";
2781 msg =
"Unknown reason code.";
2785 serverLog(LL_WARNING,
"Currently unable to failover: %s",
msg);
2796 clusterNode *oldmaster =
myself->slaveof;
2798 if (nodeIsMaster(
myself) || oldmaster ==
NULL)
return;
2802 replicationUnsetMaster();
2805 for (j = 0; j < CLUSTER_SLOTS; j++) {
2834 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2835 int needed_quorum = (server.cluster->size / 2) + 1;
2836 int manual_failover = server.cluster->mf_end != 0 &&
2837 server.cluster->mf_can_start;
2838 mstime_t auth_timeout, auth_retry_time;
2840 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2849 auth_timeout = server.cluster_node_timeout*2;
2850 if (auth_timeout < 2000) auth_timeout = 2000;
2851 auth_retry_time = auth_timeout*2;
2858 if (nodeIsMaster(
myself) ||
2860 (!nodeFailed(
myself->slaveof) && !manual_failover) ||
2861 myself->slaveof->numslots == 0)
2865 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
2871 if (server.repl_state == REPL_STATE_CONNECTED) {
2872 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
2875 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
2881 if (data_age > server.cluster_node_timeout)
2882 data_age -= server.cluster_node_timeout;
2888 if (server.cluster_slave_validity_factor &&
2890 (((mstime_t)server.repl_ping_slave_period * 1000) +
2891 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
2893 if (!manual_failover) {
2901 if (auth_age > auth_retry_time) {
2902 server.cluster->failover_auth_time = mstime() +
2905 server.cluster->failover_auth_count = 0;
2906 server.cluster->failover_auth_sent = 0;
2911 server.cluster->failover_auth_time +=
2912 server.cluster->failover_auth_rank * 1000;
2914 if (server.cluster->mf_end) {
2915 server.cluster->failover_auth_time = mstime();
2916 server.cluster->failover_auth_rank = 0;
2918 serverLog(LL_WARNING,
2919 "Start of election delayed for %lld milliseconds "
2920 "(rank #%d, offset %lld).",
2921 server.cluster->failover_auth_time - mstime(),
2922 server.cluster->failover_auth_rank,
2923 replicationGetSlaveOffset());
2936 if (server.cluster->failover_auth_sent == 0 &&
2937 server.cluster->mf_end == 0)
2940 if (newrank > server.cluster->failover_auth_rank) {
2941 long long added_delay =
2942 (newrank - server.cluster->failover_auth_rank) * 1000;
2943 server.cluster->failover_auth_time += added_delay;
2944 server.cluster->failover_auth_rank = newrank;
2945 serverLog(LL_WARNING,
2946 "Slave rank updated to #%d, added %lld milliseconds of delay.",
2947 newrank, added_delay);
2952 if (mstime() < server.cluster->failover_auth_time) {
2958 if (auth_age > auth_timeout) {
2964 if (server.cluster->failover_auth_sent == 0) {
2965 server.cluster->currentEpoch++;
2966 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
2967 serverLog(LL_WARNING,
"Starting a failover election for epoch %llu.",
2968 (
unsigned long long) server.cluster->currentEpoch);
2970 server.cluster->failover_auth_sent = 1;
2972 CLUSTER_TODO_UPDATE_STATE|
2973 CLUSTER_TODO_FSYNC_CONFIG);
2978 if (server.cluster->failover_auth_count >= needed_quorum) {
2981 serverLog(LL_WARNING,
2982 "Failover election won: I'm the new master.");
2985 if (
myself->configEpoch < server.cluster->failover_auth_epoch) {
2986 myself->configEpoch = server.cluster->failover_auth_epoch;
2987 serverLog(LL_WARNING,
2988 "configEpoch set to %llu after successful failover",
2989 (
unsigned long long)
myself->configEpoch);
3027 int j, okslaves = 0;
3028 clusterNode *mymaster =
myself->slaveof, *target =
NULL, *candidate =
NULL;
3033 if (server.cluster->state != CLUSTER_OK)
return;
3037 if (mymaster ==
NULL)
return;
3038 for (j = 0; j < mymaster->numslaves; j++)
3039 if (!nodeFailed(mymaster->slaves[j]) &&
3040 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3041 if (okslaves <= server.cluster_migration_barrier)
return;
3054 di = dictGetSafeIterator(server.cluster->nodes);
3055 while((de = dictNext(di)) !=
NULL) {
3056 clusterNode *node = dictGetVal(de);
3057 int okslaves = 0, is_orphaned = 1;
3063 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3064 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3068 if (okslaves > 0) is_orphaned = 0;
3071 if (!target && node->numslots > 0) target = node;
3075 if (!node->orphaned_time) node->orphaned_time = mstime();
3077 node->orphaned_time = 0;
3083 if (okslaves == max_slaves) {
3084 for (j = 0; j < node->numslaves; j++) {
3085 if (memcmp(node->slaves[j]->name,
3087 CLUSTER_NAMELEN) < 0)
3089 candidate = node->slaves[j];
3094 dictReleaseIterator(di);
3101 if (target && candidate ==
myself &&
3102 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY)
3104 serverLog(LL_WARNING,
"Migrating to orphaned master %.40s",
3145 if (server.cluster->mf_end && clientsArePaused()) {
3146 server.clients_pause_end_time = 0;
3149 server.cluster->mf_end = 0;
3150 server.cluster->mf_can_start = 0;
3151 server.cluster->mf_slave =
NULL;
3152 server.cluster->mf_master_offset = 0;
3157 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3158 serverLog(LL_WARNING,
"Manual failover timed out.");
3167 if (server.cluster->mf_end == 0)
return;
3171 if (server.cluster->mf_can_start)
return;
3173 if (server.cluster->mf_master_offset == 0)
return;
3175 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3178 server.cluster->mf_can_start = 1;
3179 serverLog(LL_WARNING,
3180 "All master replication stream processed, "
3181 "manual failover can start.");
3193 int update_state = 0;
3194 int orphaned_masters;
3197 mstime_t min_pong = 0, now = mstime();
3198 clusterNode *min_pong_node =
NULL;
3199 static unsigned long long iteration = 0;
3200 mstime_t handshake_timeout;
3208 static char *prev_ip =
NULL;
3209 char *curr_ip = server.cluster_announce_ip;
3212 if (prev_ip ==
NULL && curr_ip !=
NULL) changed = 1;
3213 if (prev_ip !=
NULL && curr_ip ==
NULL) changed = 1;
3214 if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3218 if (prev_ip) prev_ip = zstrdup(prev_ip);
3221 strncpy(
myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3222 myself->ip[NET_IP_STR_LEN-1] =
'\0';
3233 handshake_timeout = server.cluster_node_timeout;
3234 if (handshake_timeout < 1000) handshake_timeout = 1000;
3239 di = dictGetSafeIterator(server.cluster->nodes);
3240 server.cluster->stats_pfail_nodes = 0;
3241 while((de = dictNext(di)) !=
NULL) {
3242 clusterNode *node = dictGetVal(de);
3246 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR))
continue;
3248 if (node->flags & CLUSTER_NODE_PFAIL)
3249 server.cluster->stats_pfail_nodes++;
3253 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3258 if (node->link ==
NULL) {
3260 mstime_t old_ping_sent;
3263 fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3264 node->cport, NET_FIRST_BIND_ADDR);
3271 if (node->ping_sent == 0) node->ping_sent = mstime();
3272 serverLog(LL_DEBUG,
"Unable to connect to "
3273 "Cluster Node [%s]:%d -> %s", node->ip,
3274 node->cport, server.neterr);
3280 aeCreateFileEvent(server.el,
link->fd,AE_READABLE,
3288 old_ping_sent = node->ping_sent;
3290 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3291 if (old_ping_sent) {
3295 node->ping_sent = old_ping_sent;
3302 node->flags &= ~CLUSTER_NODE_MEET;
3304 serverLog(LL_DEBUG,
"Connecting with Node %.40s at %s:%d",
3305 node->name, node->ip, node->cport);
3308 dictReleaseIterator(di);
3312 if (!(iteration % 10)) {
3317 for (j = 0; j < 5; j++) {
3318 de = dictGetRandomKey(server.cluster->nodes);
3319 clusterNode *
this = dictGetVal(de);
3322 if (this->
link ==
NULL || this->ping_sent != 0)
continue;
3323 if (this->
flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3325 if (min_pong_node ==
NULL || min_pong > this->pong_received) {
3326 min_pong_node =
this;
3327 min_pong = this->pong_received;
3330 if (min_pong_node) {
3331 serverLog(LL_DEBUG,
"Pinging node %.40s", min_pong_node->name);
3342 orphaned_masters = 0;
3345 di = dictGetSafeIterator(server.cluster->nodes);
3346 while((de = dictNext(di)) !=
NULL) {
3347 clusterNode *node = dictGetVal(de);
3352 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3357 if (nodeIsSlave(
myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3363 if (okslaves == 0 && node->numslots > 0 &&
3364 node->flags & CLUSTER_NODE_MIGRATE_TO)
3368 if (okslaves > max_slaves) max_slaves = okslaves;
3370 this_slaves = okslaves;
3377 now - node->link->ctime >
3378 server.cluster_node_timeout &&
3380 node->pong_received < node->ping_sent &&
3382 now - node->ping_sent > server.cluster_node_timeout/2)
3393 node->ping_sent == 0 &&
3394 (now - node->pong_received) > server.cluster_node_timeout/2)
3402 if (server.cluster->mf_end &&
3404 server.cluster->mf_slave == node &&
3412 if (node->ping_sent == 0)
continue;
3417 delay = now - node->ping_sent;
3419 if (delay > server.cluster_node_timeout) {
3422 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3423 serverLog(LL_DEBUG,
"*** NODE %.40s possibly failing",
3425 node->flags |= CLUSTER_NODE_PFAIL;
3430 dictReleaseIterator(di);
3435 if (nodeIsSlave(
myself) &&
3436 server.masterhost ==
NULL &&
3438 nodeHasAddr(
myself->slaveof))
3440 replicationSetMaster(
myself->slaveof->ip,
myself->slaveof->port);
3446 if (nodeIsSlave(
myself)) {
3454 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3458 if (update_state || server.cluster->state == CLUSTER_FAIL)
3470 if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3474 if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3478 if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3479 int fsync = server.cluster->todo_before_sleep &
3480 CLUSTER_TODO_FSYNC_CONFIG;
3486 server.cluster->todo_before_sleep = 0;
3490 server.cluster->todo_before_sleep |=
flags;
3502 return (bitmap[
byte] & (1<<
bit)) != 0;
3509 bitmap[byte] |= 1<<
bit;
3516 bitmap[byte] &= ~(1<<
bit);
3523 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3526 while((de = dictNext(di)) !=
NULL) {
3527 clusterNode *node = dictGetVal(de);
3529 if (nodeIsSlave(node))
continue;
3530 slaves += node->numslaves;
3532 dictReleaseIterator(di);
3556 n->flags |= CLUSTER_NODE_MIGRATE_TO;
3565 if (old)
n->numslots--;
3579 if (server.cluster->slots[slot])
return C_ERR;
3581 server.cluster->slots[slot] =
n;
3589 clusterNode *
n = server.cluster->slots[slot];
3591 if (!
n)
return C_ERR;
3593 server.cluster->slots[slot] =
NULL;
3602 for (j = 0; j < CLUSTER_SLOTS; j++) {
3612 memset(server.cluster->migrating_slots_to,0,
3613 sizeof(server.cluster->migrating_slots_to));
3614 memset(server.cluster->importing_slots_from,0,
3615 sizeof(server.cluster->importing_slots_from));
3626 #define CLUSTER_MAX_REJOIN_DELAY 5000
3627 #define CLUSTER_MIN_REJOIN_DELAY 500
3628 #define CLUSTER_WRITABLE_DELAY 2000
3632 int reachable_masters = 0;
3633 static mstime_t among_minority_time;
3634 static mstime_t first_call_time = 0;
3636 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3644 if (first_call_time == 0) first_call_time = mstime();
3645 if (nodeIsMaster(
myself) &&
3646 server.cluster->state == CLUSTER_FAIL &&
3651 new_state = CLUSTER_OK;
3654 if (server.cluster_require_full_coverage) {
3655 for (j = 0; j < CLUSTER_SLOTS; j++) {
3656 if (server.cluster->slots[j] ==
NULL ||
3657 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3659 new_state = CLUSTER_FAIL;
3674 server.cluster->size = 0;
3675 di = dictGetSafeIterator(server.cluster->nodes);
3676 while((de = dictNext(di)) !=
NULL) {
3677 clusterNode *node = dictGetVal(de);
3679 if (nodeIsMaster(node) && node->numslots) {
3680 server.cluster->size++;
3681 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3682 reachable_masters++;
3685 dictReleaseIterator(di);
3691 int needed_quorum = (server.cluster->size / 2) + 1;
3693 if (reachable_masters < needed_quorum) {
3694 new_state = CLUSTER_FAIL;
3695 among_minority_time = mstime();
3700 if (new_state != server.cluster->state) {
3701 mstime_t rejoin_delay = server.cluster_node_timeout;
3712 if (new_state == CLUSTER_OK &&
3714 mstime() - among_minority_time < rejoin_delay)
3720 serverLog(LL_WARNING,
"Cluster state changed: %s",
3721 new_state == CLUSTER_OK ?
"ok" :
"fail");
3722 server.cluster->state = new_state;
3750 int update_config = 0;
3754 if (nodeIsSlave(
myself))
return C_OK;
3757 for (j = 1; j < server.dbnum; j++) {
3758 if (dictSize(server.db[j].dict))
return C_ERR;
3763 for (j = 0; j < CLUSTER_SLOTS; j++) {
3764 if (!countKeysInSlot(j))
continue;
3768 if (server.cluster->slots[j] ==
myself ||
3769 server.cluster->importing_slots_from[j] !=
NULL)
continue;
3777 if (server.cluster->slots[j] ==
NULL) {
3778 serverLog(LL_WARNING,
"I have keys for unassigned slot %d. "
3779 "Taking responsibility for it.",j);
3782 serverLog(LL_WARNING,
"I have keys for slot %d, but the slot is "
3783 "assigned to another node. "
3784 "Setting it to importing state.",j);
3785 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3800 serverAssert(
myself->numslots == 0);
3802 if (nodeIsMaster(
myself)) {
3803 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3804 myself->flags |= CLUSTER_NODE_SLAVE;
3812 replicationSetMaster(
n->ip,
n->port);
3826 {CLUSTER_NODE_MYSELF,
"myself,"},
3827 {CLUSTER_NODE_MASTER,
"master,"},
3828 {CLUSTER_NODE_SLAVE,
"slave,"},
3829 {CLUSTER_NODE_PFAIL,
"fail?,"},
3830 {CLUSTER_NODE_FAIL,
"fail,"},
3831 {CLUSTER_NODE_HANDSHAKE,
"handshake,"},
3832 {CLUSTER_NODE_NOADDR,
"noaddr,"}
3839 ci = sdscat(ci,
"noflags,");
3844 if (
flags & nodeflag->
flag) ci = sdscat(ci, nodeflag->
name);
3860 ci = sdscatprintf(sdsempty(),
"%.40s %s:%d@%d ",
3871 ci = sdscatprintf(ci,
" %.40s ",node->slaveof->name);
3873 ci = sdscatlen(ci,
" - ",3);
3876 ci = sdscatprintf(ci,
"%lld %lld %llu %s",
3877 (
long long) node->ping_sent,
3878 (
long long) node->pong_received,
3879 (
unsigned long long) node->configEpoch,
3880 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
3881 "connected" :
"disconnected");
3885 for (j = 0; j < CLUSTER_SLOTS; j++) {
3891 if (
start != -1 && (!
bit || j == CLUSTER_SLOTS-1)) {
3892 if (
bit && j == CLUSTER_SLOTS-1) j++;
3895 ci = sdscatprintf(ci,
" %d",
start);
3897 ci = sdscatprintf(ci,
" %d-%d",
start,j-1);
3906 if (node->flags & CLUSTER_NODE_MYSELF) {
3907 for (j = 0; j < CLUSTER_SLOTS; j++) {
3908 if (server.cluster->migrating_slots_to[j]) {
3909 ci = sdscatprintf(ci,
" [%d->-%.40s]",j,
3910 server.cluster->migrating_slots_to[j]->name);
3911 }
else if (server.cluster->importing_slots_from[j]) {
3912 ci = sdscatprintf(ci,
" [%d-<-%.40s]",j,
3913 server.cluster->importing_slots_from[j]->name);
3933 sds ci = sdsempty(), ni;
3937 di = dictGetSafeIterator(server.cluster->nodes);
3938 while((de = dictNext(di)) !=
NULL) {
3939 clusterNode *node = dictGetVal(de);
3941 if (node->flags &
filter)
continue;
3943 ci = sdscatsds(ci,ni);
3945 ci = sdscatlen(ci,
"\n",1);
3947 dictReleaseIterator(di);
3957 case CLUSTERMSG_TYPE_PING:
return "ping";
3958 case CLUSTERMSG_TYPE_PONG:
return "pong";
3959 case CLUSTERMSG_TYPE_MEET:
return "meet";
3960 case CLUSTERMSG_TYPE_FAIL:
return "fail";
3961 case CLUSTERMSG_TYPE_PUBLISH:
return "publish";
3962 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST:
return "auth-req";
3963 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK:
return "auth-ack";
3964 case CLUSTERMSG_TYPE_UPDATE:
return "update";
3965 case CLUSTERMSG_TYPE_MFSTART:
return "mfstart";
3973 if (getLongLongFromObject(o,&slot) != C_OK ||
3974 slot < 0 || slot >= CLUSTER_SLOTS)
3976 addReplyError(
c,
"Invalid or out of range slot");
3994 int num_masters = 0;
3995 void *slot_replylen = addDeferredMultiBulkLength(
c);
3998 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3999 while((de = dictNext(di)) !=
NULL) {
4000 clusterNode *node = dictGetVal(de);
4001 int j = 0,
start = -1;
4005 if (!nodeIsMaster(node) || node->numslots == 0)
continue;
4007 for (j = 0; j < CLUSTER_SLOTS; j++) {
4013 if (
start != -1 && (!
bit || j == CLUSTER_SLOTS-1)) {
4014 int nested_elements = 3;
4015 void *nested_replylen = addDeferredMultiBulkLength(
c);
4017 if (
bit && j == CLUSTER_SLOTS-1) j++;
4022 addReplyLongLong(
c,
start);
4023 addReplyLongLong(
c,
start);
4025 addReplyLongLong(
c,
start);
4026 addReplyLongLong(
c, j-1);
4031 addReplyMultiBulkLen(
c, 3);
4032 addReplyBulkCString(
c, node->ip);
4033 addReplyLongLong(
c, node->port);
4034 addReplyBulkCBuffer(
c, node->name, CLUSTER_NAMELEN);
4037 for (
i = 0;
i < node->numslaves;
i++) {
4040 if (nodeFailed(node->slaves[
i]))
continue;
4041 addReplyMultiBulkLen(
c, 3);
4042 addReplyBulkCString(
c, node->slaves[
i]->ip);
4043 addReplyLongLong(
c, node->slaves[
i]->port);
4044 addReplyBulkCBuffer(
c, node->slaves[
i]->name, CLUSTER_NAMELEN);
4047 setDeferredMultiBulkLength(
c, nested_replylen, nested_elements);
4052 dictReleaseIterator(di);
4053 setDeferredMultiBulkLength(
c, slot_replylen, num_masters);
4057 if (server.cluster_enabled == 0) {
4058 addReplyError(
c,
"This instance has cluster support disabled");
4062 if (!strcasecmp(
c->argv[1]->ptr,
"meet") && (
c->argc == 4 ||
c->argc == 5)) {
4064 long long port, cport;
4066 if (getLongLongFromObject(
c->argv[3], &port) != C_OK) {
4067 addReplyErrorFormat(
c,
"Invalid TCP base port specified: %s",
4068 (
char*)
c->argv[3]->ptr);
4073 if (getLongLongFromObject(
c->argv[4], &cport) != C_OK) {
4074 addReplyErrorFormat(
c,
"Invalid TCP bus port specified: %s",
4075 (
char*)
c->argv[4]->ptr);
4079 cport = port + CLUSTER_PORT_INCR;
4085 addReplyErrorFormat(
c,
"Invalid node address specified: %s:%s",
4086 (
char*)
c->argv[2]->ptr, (
char*)
c->argv[3]->ptr);
4088 addReply(
c,shared.ok);
4090 }
else if (!strcasecmp(
c->argv[1]->ptr,
"nodes") &&
c->argc == 2) {
4095 o = createObject(OBJ_STRING,ci);
4098 }
else if (!strcasecmp(
c->argv[1]->ptr,
"myid") &&
c->argc == 2) {
4100 addReplyBulkCBuffer(
c,
myself->name, CLUSTER_NAMELEN);
4101 }
else if (!strcasecmp(
c->argv[1]->ptr,
"slots") &&
c->argc == 2) {
4104 }
else if (!strcasecmp(
c->argv[1]->ptr,
"flushslots") &&
c->argc == 2) {
4106 if (dictSize(server.db[0].dict) != 0) {
4107 addReplyError(
c,
"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4112 addReply(
c,shared.ok);
4113 }
else if ((!strcasecmp(
c->argv[1]->ptr,
"addslots") ||
4114 !strcasecmp(
c->argv[1]->ptr,
"delslots")) &&
c->argc >= 3)
4119 unsigned char *
slots = zmalloc(CLUSTER_SLOTS);
4120 int del = !strcasecmp(
c->argv[1]->ptr,
"delslots");
4125 for (j = 2; j <
c->argc; j++) {
4130 if (
del && server.cluster->slots[slot] ==
NULL) {
4131 addReplyErrorFormat(
c,
"Slot %d is already unassigned", slot);
4134 }
else if (!
del && server.cluster->slots[slot]) {
4135 addReplyErrorFormat(
c,
"Slot %d is already busy", slot);
4139 if (
slots[slot]++ == 1) {
4140 addReplyErrorFormat(
c,
"Slot %d specified multiple times",
4146 for (j = 0; j < CLUSTER_SLOTS; j++) {
4152 if (server.cluster->importing_slots_from[j])
4153 server.cluster->importing_slots_from[j] =
NULL;
4157 serverAssertWithInfo(
c,
NULL,retval == C_OK);
4162 addReply(
c,shared.ok);
4163 }
else if (!strcasecmp(
c->argv[1]->ptr,
"setslot") &&
c->argc >= 4) {
4171 if (nodeIsSlave(
myself)) {
4172 addReplyError(
c,
"Please use SETSLOT only with masters.");
4178 if (!strcasecmp(
c->argv[3]->ptr,
"migrating") &&
c->argc == 5) {
4179 if (server.cluster->slots[slot] !=
myself) {
4180 addReplyErrorFormat(
c,
"I'm not the owner of hash slot %u",slot);
4184 addReplyErrorFormat(
c,
"I don't know about node %s",
4185 (
char*)
c->argv[4]->ptr);
4188 server.cluster->migrating_slots_to[slot] =
n;
4189 }
else if (!strcasecmp(
c->argv[3]->ptr,
"importing") &&
c->argc == 5) {
4190 if (server.cluster->slots[slot] ==
myself) {
4191 addReplyErrorFormat(
c,
4192 "I'm already the owner of hash slot %u",slot);
4196 addReplyErrorFormat(
c,
"I don't know about node %s",
4197 (
char*)
c->argv[3]->ptr);
4200 server.cluster->importing_slots_from[slot] =
n;
4201 }
else if (!strcasecmp(
c->argv[3]->ptr,
"stable") &&
c->argc == 4) {
4203 server.cluster->importing_slots_from[slot] =
NULL;
4204 server.cluster->migrating_slots_to[slot] =
NULL;
4205 }
else if (!strcasecmp(
c->argv[3]->ptr,
"node") &&
c->argc == 5) {
4210 addReplyErrorFormat(
c,
"Unknown node %s",
4211 (
char*)
c->argv[4]->ptr);
4217 if (countKeysInSlot(slot) != 0) {
4218 addReplyErrorFormat(
c,
4219 "Can't assign hashslot %d to a different node "
4220 "while I still hold keys for this hash slot.", slot);
4227 if (countKeysInSlot(slot) == 0 &&
4228 server.cluster->migrating_slots_to[slot])
4229 server.cluster->migrating_slots_to[slot] =
NULL;
4234 server.cluster->importing_slots_from[slot])
4246 serverLog(LL_WARNING,
4247 "configEpoch updated after importing slot %d", slot);
4249 server.cluster->importing_slots_from[slot] =
NULL;
4255 "Invalid CLUSTER SETSLOT action or number of arguments");
4259 addReply(
c,shared.ok);
4260 }
else if (!strcasecmp(
c->argv[1]->ptr,
"bumpepoch") &&
c->argc == 2) {
4263 sds reply = sdscatprintf(sdsempty(),
"+%s %llu\r\n",
4264 (retval == C_OK) ?
"BUMPED" :
"STILL",
4265 (
unsigned long long)
myself->configEpoch);
4266 addReplySds(
c,reply);
4267 }
else if (!strcasecmp(
c->argv[1]->ptr,
"info") &&
c->argc == 2) {
4269 char *statestr[] = {
"ok",
"fail",
"needhelp"};
4270 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4274 for (j = 0; j < CLUSTER_SLOTS; j++) {
4275 clusterNode *
n = server.cluster->slots[j];
4277 if (
n ==
NULL)
continue;
4279 if (nodeFailed(
n)) {
4281 }
else if (nodeTimedOut(
n)) {
4291 sds
info = sdscatprintf(sdsempty(),
4292 "cluster_state:%s\r\n"
4293 "cluster_slots_assigned:%d\r\n"
4294 "cluster_slots_ok:%d\r\n"
4295 "cluster_slots_pfail:%d\r\n"
4296 "cluster_slots_fail:%d\r\n"
4297 "cluster_known_nodes:%lu\r\n"
4298 "cluster_size:%d\r\n"
4299 "cluster_current_epoch:%llu\r\n"
4300 "cluster_my_epoch:%llu\r\n"
4301 , statestr[server.cluster->state],
4306 dictSize(server.cluster->nodes),
4307 server.cluster->size,
4308 (
unsigned long long) server.cluster->currentEpoch,
4309 (
unsigned long long) myepoch
4313 long long tot_msg_sent = 0;
4314 long long tot_msg_received = 0;
4316 for (
int i = 0;
i < CLUSTERMSG_TYPE_COUNT;
i++) {
4317 if (server.cluster->stats_bus_messages_sent[
i] == 0)
continue;
4318 tot_msg_sent += server.cluster->stats_bus_messages_sent[
i];
4320 "cluster_stats_messages_%s_sent:%lld\r\n",
4322 server.cluster->stats_bus_messages_sent[
i]);
4325 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4327 for (
int i = 0;
i < CLUSTERMSG_TYPE_COUNT;
i++) {
4328 if (server.cluster->stats_bus_messages_received[
i] == 0)
continue;
4329 tot_msg_received += server.cluster->stats_bus_messages_received[
i];
4331 "cluster_stats_messages_%s_received:%lld\r\n",
4333 server.cluster->stats_bus_messages_received[
i]);
4336 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4339 addReplySds(
c,sdscatprintf(sdsempty(),
"$%lu\r\n",
4340 (
unsigned long)sdslen(
info)));
4341 addReplySds(
c,
info);
4342 addReply(
c,shared.crlf);
4343 }
else if (!strcasecmp(
c->argv[1]->ptr,
"saveconfig") &&
c->argc == 2) {
4347 addReply(
c,shared.ok);
4349 addReplyErrorFormat(
c,
"error saving the cluster node config: %s",
4351 }
else if (!strcasecmp(
c->argv[1]->ptr,
"keyslot") &&
c->argc == 3) {
4353 sds
key =
c->argv[2]->ptr;
4356 }
else if (!strcasecmp(
c->argv[1]->ptr,
"countkeysinslot") &&
c->argc == 3) {
4360 if (getLongLongFromObjectOrReply(
c,
c->argv[2],&slot,
NULL) != C_OK)
4362 if (slot < 0 || slot >= CLUSTER_SLOTS) {
4363 addReplyError(
c,
"Invalid slot");
4366 addReplyLongLong(
c,countKeysInSlot(slot));
4367 }
else if (!strcasecmp(
c->argv[1]->ptr,
"getkeysinslot") &&
c->argc == 4) {
4369 long long maxkeys, slot;
4370 unsigned int numkeys, j;
4373 if (getLongLongFromObjectOrReply(
c,
c->argv[2],&slot,
NULL) != C_OK)
4375 if (getLongLongFromObjectOrReply(
c,
c->argv[3],&maxkeys,
NULL)
4378 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4379 addReplyError(
c,
"Invalid slot or number of keys");
4383 keys = zmalloc(
sizeof(robj*)*maxkeys);
4384 numkeys = getKeysInSlot(slot,
keys, maxkeys);
4385 addReplyMultiBulkLen(
c,numkeys);
4386 for (j = 0; j < numkeys; j++) {
4387 addReplyBulk(
c,
keys[j]);
4388 decrRefCount(
keys[j]);
4391 }
else if (!strcasecmp(
c->argv[1]->ptr,
"forget") &&
c->argc == 3) {
4396 addReplyErrorFormat(
c,
"Unknown node %s", (
char*)
c->argv[2]->ptr);
4399 addReplyError(
c,
"I tried hard but I can't forget myself...");
4402 addReplyError(
c,
"Can't forget my master!");
4408 CLUSTER_TODO_SAVE_CONFIG);
4409 addReply(
c,shared.ok);
4410 }
else if (!strcasecmp(
c->argv[1]->ptr,
"replicate") &&
c->argc == 3) {
4416 addReplyErrorFormat(
c,
"Unknown node %s", (
char*)
c->argv[2]->ptr);
4422 addReplyError(
c,
"Can't replicate myself");
4427 if (nodeIsSlave(
n)) {
4428 addReplyError(
c,
"I can only replicate a master, not a slave.");
4435 if (nodeIsMaster(
myself) &&
4436 (
myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4438 "To set a master the node must be empty and "
4439 "without assigned slots.");
4446 addReply(
c,shared.ok);
4447 }
else if (!strcasecmp(
c->argv[1]->ptr,
"slaves") &&
c->argc == 3) {
4454 addReplyErrorFormat(
c,
"Unknown node %s", (
char*)
c->argv[2]->ptr);
4458 if (nodeIsSlave(
n)) {
4459 addReplyError(
c,
"The specified node is not a master");
4463 addReplyMultiBulkLen(
c,
n->numslaves);
4464 for (j = 0; j <
n->numslaves; j++) {
4466 addReplyBulkCString(
c,ni);
4469 }
else if (!strcasecmp(
c->argv[1]->ptr,
"count-failure-reports") &&
4476 addReplyErrorFormat(
c,
"Unknown node %s", (
char*)
c->argv[2]->ptr);
4481 }
else if (!strcasecmp(
c->argv[1]->ptr,
"failover") &&
4482 (
c->argc == 2 ||
c->argc == 3))
4485 int force = 0, takeover = 0;
4488 if (!strcasecmp(
c->argv[2]->ptr,
"force")) {
4490 }
else if (!strcasecmp(
c->argv[2]->ptr,
"takeover")) {
4494 addReply(
c,shared.syntaxerr);
4500 if (nodeIsMaster(
myself)) {
4501 addReplyError(
c,
"You should send CLUSTER FAILOVER to a slave");
4504 addReplyError(
c,
"I'm a slave but my master is unknown to me");
4506 }
else if (!force &&
4507 (nodeFailed(
myself->slaveof) ||
4510 addReplyError(
c,
"Master is down or failed, "
4511 "please use CLUSTER FAILOVER FORCE");
4515 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4522 serverLog(LL_WARNING,
"Taking over the master (user request).");
4529 serverLog(LL_WARNING,
"Forced failover user request accepted.");
4530 server.cluster->mf_can_start = 1;
4532 serverLog(LL_WARNING,
"Manual failover user request accepted.");
4535 addReply(
c,shared.ok);
4536 }
else if (!strcasecmp(
c->argv[1]->ptr,
"set-config-epoch") &&
c->argc == 3)
4547 if (getLongLongFromObjectOrReply(
c,
c->argv[2],&epoch,
NULL) != C_OK)
4551 addReplyErrorFormat(
c,
"Invalid config epoch specified: %lld",epoch);
4552 }
else if (dictSize(server.cluster->nodes) > 1) {
4553 addReplyError(
c,
"The user can assign a config epoch only when the "
4554 "node does not know any other node.");
4555 }
else if (
myself->configEpoch != 0) {
4556 addReplyError(
c,
"Node config epoch is already non-zero");
4558 myself->configEpoch = epoch;
4559 serverLog(LL_WARNING,
4560 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4561 (
unsigned long long)
myself->configEpoch);
4563 if (server.cluster->currentEpoch < (
uint64_t)epoch)
4564 server.cluster->currentEpoch = epoch;
4569 CLUSTER_TODO_SAVE_CONFIG);
4570 addReply(
c,shared.ok);
4572 }
else if (!strcasecmp(
c->argv[1]->ptr,
"reset") &&
4573 (
c->argc == 2 ||
c->argc == 3))
4580 if (!strcasecmp(
c->argv[2]->ptr,
"hard")) {
4582 }
else if (!strcasecmp(
c->argv[2]->ptr,
"soft")) {
4585 addReply(
c,shared.syntaxerr);
4592 if (nodeIsMaster(
myself) && dictSize(
c->db->dict) != 0) {
4593 addReplyError(
c,
"CLUSTER RESET can't be called with "
4594 "master nodes containing keys");
4598 addReply(
c,shared.ok);
4600 addReplyError(
c,
"Wrong CLUSTER subcommand or number of arguments");
4611 unsigned char buf[2];
4616 rioInitWithBuffer(payload,sdsempty());
4617 serverAssert(rdbSaveObjectType(payload,o));
4618 serverAssert(rdbSaveObject(payload,o));
4628 buf[0] = RDB_VERSION & 0xff;
4629 buf[1] = (RDB_VERSION >> 8) & 0xff;
4630 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,
buf,2);
4633 crc = crc64(0,(
unsigned char*)payload->io.buffer.ptr,
4634 sdslen(payload->io.buffer.ptr));
4636 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4644 unsigned char *footer;
4649 if (
len < 10)
return C_ERR;
4650 footer =
p+(
len-10);
4653 rdbver = (footer[1] << 8) | footer[0];
4654 if (rdbver > RDB_VERSION)
return C_ERR;
4657 crc = crc64(0,
p,
len-8);
4659 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4670 if ((o = lookupKeyRead(
c->db,
c->argv[1])) ==
NULL) {
4671 addReply(
c,shared.nullbulk);
4679 dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4680 addReplyBulk(
c,dumpobj);
4681 decrRefCount(dumpobj);
4693 for (j = 4; j <
c->argc; j++) {
4694 if (!strcasecmp(
c->argv[j]->ptr,
"replace")) {
4697 addReply(
c,shared.syntaxerr);
4704 addReply(
c,shared.busykeyerr);
4709 if (getLongLongFromObjectOrReply(
c,
c->argv[2],&ttl,
NULL) != C_OK) {
4711 }
else if (ttl < 0) {
4712 addReplyError(
c,
"Invalid TTL value, must be >= 0");
4719 addReplyError(
c,
"DUMP payload version or checksum are wrong");
4723 rioInitWithBuffer(&payload,
c->argv[3]->ptr);
4724 if (((
type = rdbLoadObjectType(&payload)) == -1) ||
4725 ((obj = rdbLoadObject(
type,&payload)) ==
NULL))
4727 addReplyError(
c,
"Bad data format");
4732 if (
replace) dbDelete(
c->db,
c->argv[1]);
4735 dbAdd(
c->db,
c->argv[1],obj);
4736 if (ttl) setExpire(
c,
c->db,
c->argv[1],mstime()+ttl);
4737 signalModifiedKey(
c->db,
c->argv[1]);
4738 addReply(
c,shared.ok);
4748 #define MIGRATE_SOCKET_CACHE_ITEMS 64
4749 #define MIGRATE_SOCKET_CACHE_TTL 10
4770 sds
name = sdsempty();
4774 name = sdscatlen(
name,host->ptr,sdslen(host->ptr));
4776 name = sdscatlen(
name,port->ptr,sdslen(port->ptr));
4777 cs = dictFetchValue(server.migrate_cached_sockets,
name);
4780 cs->last_use_time = server.unixtime;
4787 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
4788 cs = dictGetVal(de);
4791 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4795 fd = anetTcpNonBlockConnect(server.neterr,
c->argv[1]->ptr,
4796 atoi(
c->argv[2]->ptr));
4799 addReplyErrorFormat(
c,
"Can't connect to target node: %s",
4803 anetEnableTcpNoDelay(server.neterr,
fd);
4806 if ((aeWait(
fd,AE_WRITABLE,
timeout) & AE_WRITABLE) == 0) {
4809 sdsnew(
"-IOERR error or timeout connecting to the client\r\n"));
4815 cs = zmalloc(
sizeof(*
cs));
4818 cs->last_use_time = server.unixtime;
4819 dictAdd(server.migrate_cached_sockets,
name,
cs);
4825 sds
name = sdsempty();
4828 name = sdscatlen(
name,host->ptr,sdslen(host->ptr));
4830 name = sdscatlen(
name,port->ptr,sdslen(port->ptr));
4831 cs = dictFetchValue(server.migrate_cached_sockets,
name);
4839 dictDelete(server.migrate_cached_sockets,
name);
4844 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
4847 while((de = dictNext(di)) !=
NULL) {
4853 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4856 dictReleaseIterator(di);
4871 robj **newargv =
NULL;
4874 int write_error = 0;
4875 int argv_rewritten = 0;
4886 for (j = 6; j <
c->argc; j++) {
4887 if (!strcasecmp(
c->argv[j]->ptr,
"copy")) {
4889 }
else if (!strcasecmp(
c->argv[j]->ptr,
"replace")) {
4891 }
else if (!strcasecmp(
c->argv[j]->ptr,
"keys")) {
4892 if (sdslen(
c->argv[3]->ptr) != 0) {
4894 "When using MIGRATE KEYS option, the key argument"
4895 " must be set to the empty string");
4899 num_keys =
c->argc - j - 1;
4902 addReply(
c,shared.syntaxerr);
4908 if (getLongFromObjectOrReply(
c,
c->argv[5],&
timeout,
NULL) != C_OK ||
4909 getLongFromObjectOrReply(
c,
c->argv[4],&dbid,
NULL) != C_OK)
4920 ov = zrealloc(ov,
sizeof(robj*)*num_keys);
4921 kv = zrealloc(kv,
sizeof(robj*)*num_keys);
4924 for (j = 0; j < num_keys; j++) {
4925 if ((ov[oi] = lookupKeyRead(
c->db,
c->argv[first_key+j])) !=
NULL) {
4926 kv[oi] =
c->argv[first_key+j];
4931 if (num_keys == 0) {
4932 zfree(ov); zfree(kv);
4933 addReplySds(
c,sdsnew(
"+NOKEY\r\n"));
4943 zfree(ov); zfree(kv);
4947 rioInitWithBuffer(&
cmd,sdsempty());
4950 int select =
cs->last_dbid != dbid;
4952 serverAssertWithInfo(
c,
NULL,rioWriteBulkCount(&
cmd,
'*',2));
4953 serverAssertWithInfo(
c,
NULL,rioWriteBulkString(&
cmd,
"SELECT",6));
4954 serverAssertWithInfo(
c,
NULL,rioWriteBulkLongLong(&
cmd,dbid));
4958 for (j = 0; j < num_keys; j++) {
4960 long long expireat = getExpire(
c->db,kv[j]);
4962 if (expireat != -1) {
4963 ttl = expireat-mstime();
4964 if (ttl < 1) ttl = 1;
4966 serverAssertWithInfo(
c,
NULL,rioWriteBulkCount(&
cmd,
'*',
replace ? 5 : 4));
4967 if (server.cluster_enabled)
4968 serverAssertWithInfo(
c,
NULL,
4969 rioWriteBulkString(&
cmd,
"RESTORE-ASKING",14));
4971 serverAssertWithInfo(
c,
NULL,rioWriteBulkString(&
cmd,
"RESTORE",7));
4972 serverAssertWithInfo(
c,
NULL,sdsEncodedObject(kv[j]));
4973 serverAssertWithInfo(
c,
NULL,rioWriteBulkString(&
cmd,kv[j]->ptr,
4974 sdslen(kv[j]->ptr)));
4975 serverAssertWithInfo(
c,
NULL,rioWriteBulkLongLong(&
cmd,ttl));
4980 serverAssertWithInfo(
c,
NULL,
4981 rioWriteBulkString(&
cmd,payload.io.buffer.ptr,
4982 sdslen(payload.io.buffer.ptr)));
4983 sdsfree(payload.io.buffer.ptr);
4988 serverAssertWithInfo(
c,
NULL,rioWriteBulkString(&
cmd,
"REPLACE",7));
4994 sds
buf =
cmd.io.buffer.ptr;
4995 size_t pos = 0, towrite;
4998 while ((towrite = sdslen(
buf)-
pos) > 0) {
4999 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5001 if (nwritten != (
signed)towrite) {
5017 int error_from_target = 0;
5018 int socket_error = 0;
5021 if (!copy) newargv = zmalloc(
sizeof(robj*)*(num_keys+1));
5023 for (j = 0; j < num_keys; j++) {
5028 if ((
select && buf1[0] ==
'-') ||
buf2[0] ==
'-') {
5030 if (!error_from_target) {
5032 addReplyErrorFormat(
c,
"Target instance replied with error: %s",
5033 (
select && buf1[0] ==
'-') ? buf1+1 :
buf2+1);
5034 error_from_target = 1;
5039 dbDelete(
c->db,kv[j]);
5040 signalModifiedKey(
c->db,kv[j]);
5044 newargv[del_idx++] = kv[j];
5045 incrRefCount(kv[j]);
5053 if (!error_from_target && socket_error && j == 0 && may_retry &&
5069 newargv[0] = createStringObject(
"DEL",3);
5071 replaceClientCommandVector(
c,del_idx,newargv);
5083 if (!error_from_target && socket_error) {
5088 if (!error_from_target) {
5095 cs->last_dbid = dbid;
5096 addReply(
c,shared.ok);
5102 sdsfree(
cmd.io.buffer.ptr);
5103 zfree(ov); zfree(kv); zfree(newargv);
5112 sdsfree(
cmd.io.buffer.ptr);
5130 zfree(ov); zfree(kv);
5132 sdscatprintf(sdsempty(),
5133 "-IOERR error or timeout %s to target instance\r\n",
5134 write_error ?
"writing" :
"reading"));
5147 if (server.cluster_enabled == 0) {
5148 addReplyError(
c,
"This instance has cluster support disabled");
5151 c->flags |= CLIENT_ASKING;
5152 addReply(
c,shared.ok);
5159 if (server.cluster_enabled == 0) {
5160 addReplyError(
c,
"This instance has cluster support disabled");
5163 c->flags |= CLIENT_READONLY;
5164 addReply(
c,shared.ok);
5169 c->flags &= ~CLIENT_READONLY;
5170 addReply(
c,shared.ok);
5206 clusterNode *
n =
NULL;
5207 robj *firstkey =
NULL;
5208 int multiple_keys = 0;
5209 multiState *ms, _ms;
5211 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5218 if (
cmd->proc == execCommand) {
5221 if (!(
c->flags & CLIENT_MULTI))
return myself;
5237 for (
i = 0;
i < ms->count;
i++) {
5238 struct redisCommand *mcmd;
5240 int margc, *keyindex, numkeys, j;
5242 mcmd = ms->commands[
i].cmd;
5243 margc = ms->commands[
i].argc;
5244 margv = ms->commands[
i].argv;
5246 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5247 for (j = 0; j < numkeys; j++) {
5248 robj *thiskey = margv[keyindex[j]];
5250 sdslen(thiskey->ptr));
5252 if (firstkey ==
NULL) {
5257 n = server.cluster->slots[slot];
5264 getKeysFreeResult(keyindex);
5276 server.cluster->migrating_slots_to[slot] !=
NULL)
5279 }
else if (server.cluster->importing_slots_from[slot] !=
NULL) {
5285 if (!equalStringObjects(firstkey,thiskey)) {
5286 if (slot != thisslot) {
5288 getKeysFreeResult(keyindex);
5301 if ((migrating_slot || importing_slot) &&
5302 lookupKeyRead(&server.db[0],thiskey) ==
NULL)
5307 getKeysFreeResult(keyindex);
5315 if (server.cluster->state != CLUSTER_OK) {
5321 if (hashslot) *hashslot = slot;
5331 if (migrating_slot && missing_keys) {
5333 return server.cluster->migrating_slots_to[slot];
5340 if (importing_slot &&
5341 (
c->flags & CLIENT_ASKING ||
cmd->flags & CMD_ASKING))
5343 if (multiple_keys && missing_keys) {
5354 if (
c->flags & CLIENT_READONLY &&
5355 cmd->flags & CMD_READONLY &&
5376 if (
error_code == CLUSTER_REDIR_CROSS_SLOT) {
5377 addReplySds(
c,sdsnew(
"-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5378 }
else if (
error_code == CLUSTER_REDIR_UNSTABLE) {
5382 addReplySds(
c,sdsnew(
"-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5383 }
else if (
error_code == CLUSTER_REDIR_DOWN_STATE) {
5384 addReplySds(
c,sdsnew(
"-CLUSTERDOWN The cluster is down\r\n"));
5385 }
else if (
error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5386 addReplySds(
c,sdsnew(
"-CLUSTERDOWN Hash slot not served\r\n"));
5387 }
else if (
error_code == CLUSTER_REDIR_MOVED ||
5390 addReplySds(
c,sdscatprintf(sdsempty(),
5392 (
error_code == CLUSTER_REDIR_ASK) ?
"ASK" :
"MOVED",
5393 hashslot,
n->ip,
n->port));
5395 serverPanic(
"getNodeByQuery() unknown error.");
5411 if (
c->flags & CLIENT_BLOCKED &&
c->btype == BLOCKED_LIST) {
5416 if (server.cluster->state == CLUSTER_FAIL) {
5421 di = dictGetIterator(
c->bpop.keys);
5422 while((de = dictNext(di)) !=
NULL) {
5423 robj *
key = dictGetKey(de);
5425 clusterNode *node = server.cluster->slots[slot];
5431 server.cluster->importing_slots_from[slot] ==
NULL)
5435 CLUSTER_REDIR_DOWN_UNBOUND);
5438 CLUSTER_REDIR_MOVED);
5443 dictReleaseIterator(di);
RzBinInfo * info(RzBinFile *bf)
int clusterBlacklistExists(char *nodeid)
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request)
void clusterHandleManualFailover(void)
void clearNodeFailureIfNeeded(clusterNode *node)
void clusterRequestFailoverAuth(void)
void clusterHandleConfigEpochCollision(clusterNode *sender)
static struct redisNodeFlags redisNodeFlagsTable[]
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code)
void clusterBeforeSleep(void)
void clusterSendUpdate(clusterLink *link, clusterNode *node)
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link)
void manualFailoverCheckTimeout(void)
#define CLUSTER_MIN_REJOIN_DELAY
void clusterBroadcastPong(int target)
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void clusterSendFailoverAuth(clusterNode *node)
void clusterBroadcastMessage(void *buf, size_t len)
void clusterNodeCleanupFailureReports(clusterNode *node)
void clusterCloseAllSlots(void)
void clusterPropagatePublish(robj *channel, robj *message)
sds representClusterNodeFlags(sds ci, uint16_t flags)
void clusterSaveConfigOrDie(int do_fsync)
void clusterUpdateState(void)
void clusterRenameNode(clusterNode *node, char *newname)
#define CLUSTER_BROADCAST_LOCAL_SLAVES
int bitmapTestBit(unsigned char *bitmap, int pos)
int clusterRedirectBlockedClientIfNeeded(client *c)
int clusterNodeGetSlotBit(clusterNode *n, int slot)
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots)
void readwriteCommand(client *c)
void migrateCommand(client *c)
void migrateCloseTimedoutSockets(void)
struct migrateCachedSocket migrateCachedSocket
sds clusterGenNodesDescription(int filter)
int clusterLockConfig(char *filename)
int clusterAddSlot(clusterNode *n, int slot)
int clusterStartHandshake(char *ip, int port, int cport)
int clusterCountNonFailingSlaves(clusterNode *n)
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender)
clusterNode * createClusterNode(char *nodename, int flags)
void clusterDoBeforeSleep(int flags)
int clusterBumpConfigEpochWithoutConsensus(void)
void clusterReplyMultiBulkSlots(client *c)
int clusterHandshakeInProgress(char *ip, int port, int cport)
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n)
void clusterHandleSlaveFailover(void)
int clusterProcessPacket(clusterLink *link)
int clusterNodeFailureReportsCount(clusterNode *node)
void restoreCommand(client *c)
void clusterCommand(client *c)
void clusterDelNode(clusterNode *delnode)
int clusterLoadConfig(char *filename)
#define CLUSTER_BLACKLIST_TTL
migrateCachedSocket * migrateGetSocket(client *c, robj *host, robj *port, long timeout)
void clusterSetNodeAsMaster(clusterNode *n)
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask)
int clusterNodeClearSlotBit(clusterNode *n, int slot)
void createDumpPayload(rio *payload, robj *o)
int verifyDumpPayload(unsigned char *p, size_t len)
void clusterSendPing(clusterLink *link, int type)
void clusterSendFail(char *nodename)
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void readonlyCommand(client *c)
#define MAX_CLUSTER_ACCEPTS_PER_CALL
void bitmapSetBit(unsigned char *bitmap, int pos)
int getSlotOrReply(client *c, robj *o)
void clusterSetMaster(clusterNode *n)
unsigned int keyHashSlot(char *key, int keylen)
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen)
void handleLinkIOError(clusterLink *link)
void clusterSendPublish(clusterLink *link, robj *channel, robj *message)
int clusterAddNode(clusterNode *node)
clusterNode * getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code)
void clusterLogCantFailover(int reason)
void clusterBlacklistCleanup(void)
void clusterBlacklistAddNode(clusterNode *node)
void bitmapClearBit(unsigned char *bitmap, int pos)
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender)
int verifyClusterConfigWithData(void)
#define MIGRATE_SOCKET_CACHE_ITEMS
void markNodeAsFailingIfNeeded(clusterNode *node)
const char * clusterGetMessageTypeString(int type)
void dumpCommand(client *c)
void clusterReset(int hard)
void clusterBuildMessageHdr(clusterMsg *hdr, int type)
#define MIGRATE_SOCKET_CACHE_TTL
void resetManualFailover(void)
int clusterDelNodeSlots(clusterNode *node)
int clusterDelSlot(int slot)
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *hdr)
#define CLUSTER_MAX_REJOIN_DELAY
clusterLink * createClusterLink(clusterNode *node)
void clusterHandleSlaveMigration(int max_slaves)
int clusterNodeSetSlotBit(clusterNode *n, int slot)
void askingCommand(client *c)
uint64_t clusterGetMaxEpoch(void)
void clusterSendMFStart(clusterNode *node)
void clusterFailoverReplaceYourMaster(void)
void freeClusterNode(clusterNode *n)
int clusterMastersHaveSlaves(void)
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave)
int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n)
clusterNode * clusterLookupNode(char *name)
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave)
sds clusterGenNodeDescription(clusterNode *node)
int clusterGetSlaveRank(void)
#define CLUSTER_WRITABLE_DELAY
void nodeIp2String(char *buf, clusterLink *link, char *announced_ip)
#define CLUSTER_BROADCAST_ALL
int clusterSaveConfig(int do_fsync)
void freeClusterLink(clusterLink *link)
void migrateCloseSocket(robj *host, robj *port)
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence ftruncate
static static sync static getppid static getegid const char static filename request
static static fork const void static count close
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void count
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec struct timespec static rem const char static group const void start
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags cmd
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset fstat
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len key
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags long
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg fsync
static static fork const void static count static fd link
return memset(p, 0, total)
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
static void list(RzEgg *egg)
static static fork const void static count static fd const char const char static newpath char char char static envp time
static static fork const void static count static fd const char const char static newpath char char argv
static static fork const void static count static fd const char const char static newpath char char char static envp time_t static t const char static mode static whence const char static dir time_t static t unsigned static seconds const char struct utimbuf static buf static inc static sig const char static mode static oldfd struct tms static buf static getgid static geteuid const char static filename static arg static mask struct ustat static ubuf static getppid static setsid static egid sigset_t static set struct timeval struct timezone static tz select
static uint32_t const uint8_t * buf2
static struct @218 keys[]
static struct sockaddr static addrlen static backlog const void static flags void flags
static struct sockaddr static addrlen static backlog const void msg
int replace(char *string, const char *token, const char *fmt,...)
if(dbg->bits==RZ_SYS_BITS_64)
static xtensa_slot_internal slots[]
static const z80_opcode fd[]
int read(izstream &zs, T *x, Items items)
voidpf ZLIB_INTERNAL zcalloc(voidpf opaque, unsigned items, unsigned size)