Rizin
unix-like reverse engineering framework and cli tools
cluster.c
Go to the documentation of this file.
1 /* Redis Cluster implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  * * Redistributions of source code must retain the above copyright notice,
10  * this list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  * * Neither the name of Redis nor the names of its contributors may be used
15  * to endorse or promote products derived from this software without
16  * specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34 
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43 
44 /* A global reference to myself is handy to make code more clear.
45  * Myself always points to server.cluster->myself, that is, the clusterNode
46  * that represents this node. */
47 clusterNode *myself = NULL;
48 
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
59 clusterNode *clusterLookupNode(char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
78 
79 /* -----------------------------------------------------------------------------
80  * Initialization
81  * -------------------------------------------------------------------------- */
82 
83 /* Load the cluster config from 'filename'.
84  *
85  * If the file does not exist or is zero-length (this may happen because
86  * when we lock the nodes.conf file, we create a zero-length one for the
87  * sake of locking if it does not already exist), C_ERR is returned.
88  * If the configuration was loaded from the file, C_OK is returned. */
90  FILE *fp = fopen(filename,"r");
91  struct stat sb;
92  char *line;
93  int maxline, j;
94 
95  if (fp == NULL) {
96  if (errno == ENOENT) {
97  return C_ERR;
98  } else {
99  serverLog(LL_WARNING,
100  "Loading the cluster node config from %s: %s",
101  filename, strerror(errno));
102  exit(1);
103  }
104  }
105 
106  /* Check if the file is zero-length: if so return C_ERR to signal
107  * we have to write the config. */
108  if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
109  fclose(fp);
110  return C_ERR;
111  }
112 
113  /* Parse the file. Note that single lines of the cluster config file can
114  * be really long as they include all the hash slots of the node.
115  * This means in the worst possible case, half of the Redis slots will be
116  * present in a single line, possibly in importing or migrating state, so
117  * together with the node ID of the sender/receiver.
118  *
119  * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
120  maxline = 1024+CLUSTER_SLOTS*128;
121  line = zmalloc(maxline);
122  while(fgets(line,maxline,fp) != NULL) {
123  int argc;
124  sds *argv;
125  clusterNode *n, *master;
126  char *p, *s;
127 
128  /* Skip blank lines, they can be created either by users manually
129  * editing nodes.conf or by the config writing process if stopped
130  * before the truncate() call. */
131  if (line[0] == '\n' || line[0] == '\0') continue;
132 
133  /* Split the line into arguments for processing. */
134  argv = sdssplitargs(line,&argc);
135  if (argv == NULL) goto fmterr;
136 
137  /* Handle the special "vars" line. Don't pretend it is the last
138  * line even if it actually is when generated by Redis. */
139  if (strcasecmp(argv[0],"vars") == 0) {
140  for (j = 1; j < argc; j += 2) {
141  if (strcasecmp(argv[j],"currentEpoch") == 0) {
142  server.cluster->currentEpoch =
143  strtoull(argv[j+1],NULL,10);
144  } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
145  server.cluster->lastVoteEpoch =
146  strtoull(argv[j+1],NULL,10);
147  } else {
148  serverLog(LL_WARNING,
149  "Skipping unknown cluster config variable '%s'",
150  argv[j]);
151  }
152  }
153  sdsfreesplitres(argv,argc);
154  continue;
155  }
156 
157  /* Regular config lines have at least eight fields */
158  if (argc < 8) goto fmterr;
159 
160  /* Create this node if it does not exist */
161  n = clusterLookupNode(argv[0]);
162  if (!n) {
163  n = createClusterNode(argv[0],0);
164  clusterAddNode(n);
165  }
166  /* Address and port */
167  if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
168  *p = '\0';
169  memcpy(n->ip,argv[1],strlen(argv[1])+1);
170  char *port = p+1;
171  char *busp = strchr(port,'@');
172  if (busp) {
173  *busp = '\0';
174  busp++;
175  }
176  n->port = atoi(port);
177  /* In older versions of nodes.conf the "@busport" part is missing.
178  * In this case we set it to the default offset of 10000 from the
179  * base port. */
180  n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
181 
182  /* Parse flags */
183  p = s = argv[2];
184  while(p) {
185  p = strchr(s,',');
186  if (p) *p = '\0';
187  if (!strcasecmp(s,"myself")) {
188  serverAssert(server.cluster->myself == NULL);
189  myself = server.cluster->myself = n;
190  n->flags |= CLUSTER_NODE_MYSELF;
191  } else if (!strcasecmp(s,"master")) {
192  n->flags |= CLUSTER_NODE_MASTER;
193  } else if (!strcasecmp(s,"slave")) {
194  n->flags |= CLUSTER_NODE_SLAVE;
195  } else if (!strcasecmp(s,"fail?")) {
196  n->flags |= CLUSTER_NODE_PFAIL;
197  } else if (!strcasecmp(s,"fail")) {
198  n->flags |= CLUSTER_NODE_FAIL;
199  n->fail_time = mstime();
200  } else if (!strcasecmp(s,"handshake")) {
201  n->flags |= CLUSTER_NODE_HANDSHAKE;
202  } else if (!strcasecmp(s,"noaddr")) {
203  n->flags |= CLUSTER_NODE_NOADDR;
204  } else if (!strcasecmp(s,"noflags")) {
205  /* nothing to do */
206  } else {
207  serverPanic("Unknown flag in redis cluster config file");
208  }
209  if (p) s = p+1;
210  }
211 
212  /* Get master if any. Set the master and populate master's
213  * slave list. */
214  if (argv[3][0] != '-') {
215  master = clusterLookupNode(argv[3]);
216  if (!master) {
217  master = createClusterNode(argv[3],0);
218  clusterAddNode(master);
219  }
220  n->slaveof = master;
221  clusterNodeAddSlave(master,n);
222  }
223 
224  /* Set ping sent / pong received timestamps */
225  if (atoi(argv[4])) n->ping_sent = mstime();
226  if (atoi(argv[5])) n->pong_received = mstime();
227 
228  /* Set configEpoch for this node. */
229  n->configEpoch = strtoull(argv[6],NULL,10);
230 
231  /* Populate hash slots served by this instance. */
232  for (j = 8; j < argc; j++) {
233  int start, stop;
234 
235  if (argv[j][0] == '[') {
236  /* Here we handle migrating / importing slots */
237  int slot;
238  char direction;
239  clusterNode *cn;
240 
241  p = strchr(argv[j],'-');
242  serverAssert(p != NULL);
243  *p = '\0';
244  direction = p[1]; /* Either '>' or '<' */
245  slot = atoi(argv[j]+1);
246  p += 3;
247  cn = clusterLookupNode(p);
248  if (!cn) {
249  cn = createClusterNode(p,0);
250  clusterAddNode(cn);
251  }
252  if (direction == '>') {
253  server.cluster->migrating_slots_to[slot] = cn;
254  } else {
255  server.cluster->importing_slots_from[slot] = cn;
256  }
257  continue;
258  } else if ((p = strchr(argv[j],'-')) != NULL) {
259  *p = '\0';
260  start = atoi(argv[j]);
261  stop = atoi(p+1);
262  } else {
263  start = stop = atoi(argv[j]);
264  }
265  while(start <= stop) clusterAddSlot(n, start++);
266  }
267 
268  sdsfreesplitres(argv,argc);
269  }
270  /* Config sanity check */
271  if (server.cluster->myself == NULL) goto fmterr;
272 
273  zfree(line);
274  fclose(fp);
275 
276  serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
277 
278  /* Something that should never happen: currentEpoch smaller than
279  * the max epoch found in the nodes configuration. However we handle this
280  * as some form of protection against manual editing of critical files. */
281  if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
282  server.cluster->currentEpoch = clusterGetMaxEpoch();
283  }
284  return C_OK;
285 
286 fmterr:
287  serverLog(LL_WARNING,
288  "Unrecoverable error: corrupted cluster config file.");
289  zfree(line);
290  if (fp) fclose(fp);
291  exit(1);
292 }
293 
294 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
295  *
296  * This function writes the node config and returns 0, on error -1
297  * is returned.
298  *
299  * Note: we need to write the file in an atomic way from the point of view
300  * of the POSIX filesystem semantics, so that if the server is stopped
301  * or crashes during the write, we'll end with either the old file or the
302  * new one. Since we have the full payload to write available we can use
303  * a single write to write the whole file. If the pre-existing file was
304  * bigger we pad our payload with newlines that are anyway ignored and truncate
305  * the file afterward. */
306 int clusterSaveConfig(int do_fsync) {
307  sds ci;
308  size_t content_size;
309  struct stat sb;
310  int fd;
311 
312  server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
313 
314  /* Get the nodes description and concatenate our "vars" directive to
315  * save currentEpoch and lastVoteEpoch. */
316  ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
317  ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
318  (unsigned long long) server.cluster->currentEpoch,
319  (unsigned long long) server.cluster->lastVoteEpoch);
320  content_size = sdslen(ci);
321 
322  if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
323  == -1) goto err;
324 
325  /* Pad the new payload if the existing file length is greater. */
326  if (fstat(fd,&sb) != -1) {
327  if (sb.st_size > (off_t)content_size) {
328  ci = sdsgrowzero(ci,sb.st_size);
329  memset(ci+content_size,'\n',sb.st_size-content_size);
330  }
331  }
332  if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
333  if (do_fsync) {
334  server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
335  fsync(fd);
336  }
337 
338  /* Truncate the file if needed to remove the final \n padding that
339  * is just garbage. */
340  if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
341  /* ftruncate() failing is not a critical error. */
342  }
343  close(fd);
344  sdsfree(ci);
345  return 0;
346 
347 err:
348  if (fd != -1) close(fd);
349  sdsfree(ci);
350  return -1;
351 }
352 
353 void clusterSaveConfigOrDie(int do_fsync) {
354  if (clusterSaveConfig(do_fsync) == -1) {
355  serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
356  exit(1);
357  }
358 }
359 
360 /* Lock the cluster config using flock(), and leaks the file descritor used to
361  * acquire the lock so that the file will be locked forever.
362  *
363  * This works because we always update nodes.conf with a new version
364  * in-place, reopening the file, and writing to it in place (later adjusting
365  * the length with ftruncate()).
366  *
367  * On success C_OK is returned, otherwise an error is logged and
368  * the function returns C_ERR to signal a lock was not acquired. */
370 /* flock() does not exist on Solaris
371  * and a fcntl-based solution won't help, as we constantly re-open that file,
372  * which will release _all_ locks anyway
373  */
374 #if !defined(__sun)
375  /* To lock it, we need to open the file in a way it is created if
376  * it does not exist, otherwise there is a race condition with other
377  * processes. */
378  int fd = open(filename,O_WRONLY|O_CREAT,0644);
379  if (fd == -1) {
380  serverLog(LL_WARNING,
381  "Can't open %s in order to acquire a lock: %s",
382  filename, strerror(errno));
383  return C_ERR;
384  }
385 
386  if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
387  if (errno == EWOULDBLOCK) {
388  serverLog(LL_WARNING,
389  "Sorry, the cluster configuration file %s is already used "
390  "by a different Redis Cluster node. Please make sure that "
391  "different nodes use different cluster configuration "
392  "files.", filename);
393  } else {
394  serverLog(LL_WARNING,
395  "Impossible to lock %s: %s", filename, strerror(errno));
396  }
397  close(fd);
398  return C_ERR;
399  }
400  /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
401  * lock to the file as long as the process exists. */
402 #endif /* __sun */
403 
404  return C_OK;
405 }
406 
407 void clusterInit(void) {
408  int saveconf = 0;
409 
410  server.cluster = zmalloc(sizeof(clusterState));
411  server.cluster->myself = NULL;
412  server.cluster->currentEpoch = 0;
413  server.cluster->state = CLUSTER_FAIL;
414  server.cluster->size = 1;
415  server.cluster->todo_before_sleep = 0;
416  server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
417  server.cluster->nodes_black_list =
418  dictCreate(&clusterNodesBlackListDictType,NULL);
419  server.cluster->failover_auth_time = 0;
420  server.cluster->failover_auth_count = 0;
421  server.cluster->failover_auth_rank = 0;
422  server.cluster->failover_auth_epoch = 0;
423  server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
424  server.cluster->lastVoteEpoch = 0;
425  for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
426  server.cluster->stats_bus_messages_sent[i] = 0;
427  server.cluster->stats_bus_messages_received[i] = 0;
428  }
429  server.cluster->stats_pfail_nodes = 0;
430  memset(server.cluster->slots,0, sizeof(server.cluster->slots));
432 
433  /* Lock the cluster config file to make sure every node uses
434  * its own nodes.conf. */
435  if (clusterLockConfig(server.cluster_configfile) == C_ERR)
436  exit(1);
437 
438  /* Load or create a new nodes configuration. */
439  if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
440  /* No configuration found. We will just use the random name provided
441  * by the createClusterNode() function. */
442  myself = server.cluster->myself =
443  createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
444  serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
445  myself->name);
447  saveconf = 1;
448  }
449  if (saveconf) clusterSaveConfigOrDie(1);
450 
451  /* We need a listening TCP port for our cluster messaging needs. */
452  server.cfd_count = 0;
453 
454  /* Port sanity check II
455  * The other handshake port check is triggered too late to stop
456  * us from trying to use a too-high cluster port number. */
457  if (server.port > (65535-CLUSTER_PORT_INCR)) {
458  serverLog(LL_WARNING, "Redis port number too high. "
459  "Cluster communication port is 10,000 port "
460  "numbers higher than your Redis port. "
461  "Your Redis port number must be "
462  "lower than 55535.");
463  exit(1);
464  }
465 
466  if (listenToPort(server.port+CLUSTER_PORT_INCR,
467  server.cfd,&server.cfd_count) == C_ERR)
468  {
469  exit(1);
470  } else {
471  int j;
472 
473  for (j = 0; j < server.cfd_count; j++) {
474  if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
475  clusterAcceptHandler, NULL) == AE_ERR)
476  serverPanic("Unrecoverable error creating Redis Cluster "
477  "file event.");
478  }
479  }
480 
481  /* The slots -> keys map is a radix tree. Initialize it here. */
482  server.cluster->slots_to_keys = raxNew();
483  memset(server.cluster->slots_keys_count,0,
484  sizeof(server.cluster->slots_keys_count));
485 
486  /* Set myself->port / cport to my listening ports, we'll just need to
487  * discover the IP address via MEET messages. */
488  myself->port = server.port;
489  myself->cport = server.port+CLUSTER_PORT_INCR;
490  if (server.cluster_announce_port)
491  myself->port = server.cluster_announce_port;
492  if (server.cluster_announce_bus_port)
493  myself->cport = server.cluster_announce_bus_port;
494 
495  server.cluster->mf_end = 0;
497 }
498 
499 /* Reset a node performing a soft or hard reset:
500  *
501  * 1) All other nodes are forget.
502  * 2) All the assigned / open slots are released.
503  * 3) If the node is a slave, it turns into a master.
504  * 5) Only for hard reset: a new Node ID is generated.
505  * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
506  * 7) The new configuration is saved and the cluster state updated.
507  * 8) If the node was a slave, the whole data set is flushed away. */
508 void clusterReset(int hard) {
509  dictIterator *di;
510  dictEntry *de;
511  int j;
512 
513  /* Turn into master. */
514  if (nodeIsSlave(myself)) {
516  replicationUnsetMaster();
517  emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
518  }
519 
520  /* Close slots, reset manual failover state. */
523 
524  /* Unassign all the slots. */
525  for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
526 
527  /* Forget all the nodes, but myself. */
528  di = dictGetSafeIterator(server.cluster->nodes);
529  while((de = dictNext(di)) != NULL) {
530  clusterNode *node = dictGetVal(de);
531 
532  if (node == myself) continue;
533  clusterDelNode(node);
534  }
535  dictReleaseIterator(di);
536 
537  /* Hard reset only: set epochs to 0, change node ID. */
538  if (hard) {
539  sds oldname;
540 
541  server.cluster->currentEpoch = 0;
542  server.cluster->lastVoteEpoch = 0;
543  myself->configEpoch = 0;
544  serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
545 
546  /* To change the Node ID we need to remove the old name from the
547  * nodes table, change the ID, and re-add back with new name. */
548  oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
549  dictDelete(server.cluster->nodes,oldname);
550  sdsfree(oldname);
551  getRandomHexChars(myself->name, CLUSTER_NAMELEN);
553  serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
554  }
555 
556  /* Make sure to persist the new config and update the state. */
557  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
558  CLUSTER_TODO_UPDATE_STATE|
559  CLUSTER_TODO_FSYNC_CONFIG);
560 }
561 
562 /* -----------------------------------------------------------------------------
563  * CLUSTER communication link
564  * -------------------------------------------------------------------------- */
565 
566 clusterLink *createClusterLink(clusterNode *node) {
567  clusterLink *link = zmalloc(sizeof(*link));
568  link->ctime = mstime();
569  link->sndbuf = sdsempty();
570  link->rcvbuf = sdsempty();
571  link->node = node;
572  link->fd = -1;
573  return link;
574 }
575 
576 /* Free a cluster link, but does not free the associated node of course.
577  * This function will just make sure that the original node associated
578  * with this link will have the 'link' field set to NULL. */
579 void freeClusterLink(clusterLink *link) {
580  if (link->fd != -1) {
581  aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
582  aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
583  }
584  sdsfree(link->sndbuf);
585  sdsfree(link->rcvbuf);
586  if (link->node)
587  link->node->link = NULL;
588  close(link->fd);
589  zfree(link);
590 }
591 
592 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
593 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
594  int cport, cfd;
596  char cip[NET_IP_STR_LEN];
597  clusterLink *link;
598  UNUSED(el);
599  UNUSED(mask);
600  UNUSED(privdata);
601 
602  /* If the server is starting up, don't accept cluster connections:
603  * UPDATE messages may interact with the database content. */
604  if (server.masterhost == NULL && server.loading) return;
605 
606  while(max--) {
607  cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
608  if (cfd == ANET_ERR) {
609  if (errno != EWOULDBLOCK)
610  serverLog(LL_VERBOSE,
611  "Error accepting cluster node: %s", server.neterr);
612  return;
613  }
614  anetNonBlock(NULL,cfd);
615  anetEnableTcpNoDelay(NULL,cfd);
616 
617  /* Use non-blocking I/O for cluster messages. */
618  serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
619  /* Create a link object we use to handle the connection.
620  * It gets passed to the readable handler when data is available.
621  * Initiallly the link->node pointer is set to NULL as we don't know
622  * which node is, but the right node is references once we know the
623  * node identity. */
625  link->fd = cfd;
626  aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
627  }
628 }
629 
630 /* -----------------------------------------------------------------------------
631  * Key space handling
632  * -------------------------------------------------------------------------- */
633 
634 /* We have 16384 hash slots. The hash slot of a given key is obtained
635  * as the least significant 14 bits of the crc16 of the key.
636  *
637  * However if the key contains the {...} pattern, only the part between
638  * { and } is hashed. This may be useful in the future to force certain
639  * keys to be in the same node (assuming no resharding is in progress). */
640 unsigned int keyHashSlot(char *key, int keylen) {
641  int s, e; /* start-end indexes of { and } */
642 
643  for (s = 0; s < keylen; s++)
644  if (key[s] == '{') break;
645 
646  /* No '{' ? Hash the whole key. This is the base case. */
647  if (s == keylen) return crc16(key,keylen) & 0x3FFF;
648 
649  /* '{' found? Check if we have the corresponding '}'. */
650  for (e = s+1; e < keylen; e++)
651  if (key[e] == '}') break;
652 
653  /* No '}' or nothing betweeen {} ? Hash the whole key. */
654  if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
655 
656  /* If we are here there is both a { and a } on its right. Hash
657  * what is in the middle between { and }. */
658  return crc16(key+s+1,e-s-1) & 0x3FFF;
659 }
660 
661 /* -----------------------------------------------------------------------------
662  * CLUSTER node API
663  * -------------------------------------------------------------------------- */
664 
665 /* Create a new cluster node, with the specified flags.
666  * If "nodename" is NULL this is considered a first handshake and a random
667  * node name is assigned to this node (it will be fixed later when we'll
668  * receive the first pong).
669  *
670  * The node is created and returned to the user, but it is not automatically
671  * added to the nodes hash table. */
672 clusterNode *createClusterNode(char *nodename, int flags) {
673  clusterNode *node = zmalloc(sizeof(*node));
674 
675  if (nodename)
676  memcpy(node->name, nodename, CLUSTER_NAMELEN);
677  else
678  getRandomHexChars(node->name, CLUSTER_NAMELEN);
679  node->ctime = mstime();
680  node->configEpoch = 0;
681  node->flags = flags;
682  memset(node->slots,0,sizeof(node->slots));
683  node->numslots = 0;
684  node->numslaves = 0;
685  node->slaves = NULL;
686  node->slaveof = NULL;
687  node->ping_sent = node->pong_received = 0;
688  node->fail_time = 0;
689  node->link = NULL;
690  memset(node->ip,0,sizeof(node->ip));
691  node->port = 0;
692  node->cport = 0;
693  node->fail_reports = listCreate();
694  node->voted_time = 0;
695  node->orphaned_time = 0;
696  node->repl_offset_time = 0;
697  node->repl_offset = 0;
698  listSetFreeMethod(node->fail_reports,zfree);
699  return node;
700 }
701 
702 /* This function is called every time we get a failure report from a node.
703  * The side effect is to populate the fail_reports list (or to update
704  * the timestamp of an existing report).
705  *
706  * 'failing' is the node that is in failure state according to the
707  * 'sender' node.
708  *
709  * The function returns 0 if it just updates a timestamp of an existing
710  * failure report from the same sender. 1 is returned if a new failure
711  * report is created. */
712 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
713  list *l = failing->fail_reports;
714  listNode *ln;
715  listIter li;
716  clusterNodeFailReport *fr;
717 
718  /* If a failure report from the same sender already exists, just update
719  * the timestamp. */
720  listRewind(l,&li);
721  while ((ln = listNext(&li)) != NULL) {
722  fr = ln->value;
723  if (fr->node == sender) {
724  fr->time = mstime();
725  return 0;
726  }
727  }
728 
729  /* Otherwise create a new report. */
730  fr = zmalloc(sizeof(*fr));
731  fr->node = sender;
732  fr->time = mstime();
733  listAddNodeTail(l,fr);
734  return 1;
735 }
736 
737 /* Remove failure reports that are too old, where too old means reasonably
738  * older than the global node timeout. Note that anyway for a node to be
739  * flagged as FAIL we need to have a local PFAIL state that is at least
740  * older than the global node timeout, so we don't just trust the number
741  * of failure reports from other nodes. */
742 void clusterNodeCleanupFailureReports(clusterNode *node) {
743  list *l = node->fail_reports;
744  listNode *ln;
745  listIter li;
746  clusterNodeFailReport *fr;
747  mstime_t maxtime = server.cluster_node_timeout *
748  CLUSTER_FAIL_REPORT_VALIDITY_MULT;
749  mstime_t now = mstime();
750 
751  listRewind(l,&li);
752  while ((ln = listNext(&li)) != NULL) {
753  fr = ln->value;
754  if (now - fr->time > maxtime) listDelNode(l,ln);
755  }
756 }
757 
758 /* Remove the failing report for 'node' if it was previously considered
759  * failing by 'sender'. This function is called when a node informs us via
760  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
761  *
762  * Note that this function is called relatively often as it gets called even
763  * when there are no nodes failing, and is O(N), however when the cluster is
764  * fine the failure reports list is empty so the function runs in constant
765  * time.
766  *
767  * The function returns 1 if the failure report was found and removed.
768  * Otherwise 0 is returned. */
769 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
770  list *l = node->fail_reports;
771  listNode *ln;
772  listIter li;
773  clusterNodeFailReport *fr;
774 
775  /* Search for a failure report from this sender. */
776  listRewind(l,&li);
777  while ((ln = listNext(&li)) != NULL) {
778  fr = ln->value;
779  if (fr->node == sender) break;
780  }
781  if (!ln) return 0; /* No failure report from this sender. */
782 
783  /* Remove the failure report. */
784  listDelNode(l,ln);
786  return 1;
787 }
788 
789 /* Return the number of external nodes that believe 'node' is failing,
790  * not including this node, that may have a PFAIL or FAIL state for this
791  * node as well. */
792 int clusterNodeFailureReportsCount(clusterNode *node) {
794  return listLength(node->fail_reports);
795 }
796 
797 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
798  int j;
799 
800  for (j = 0; j < master->numslaves; j++) {
801  if (master->slaves[j] == slave) {
802  if ((j+1) < master->numslaves) {
803  int remaining_slaves = (master->numslaves - j) - 1;
804  memmove(master->slaves+j,master->slaves+(j+1),
805  (sizeof(*master->slaves) * remaining_slaves));
806  }
807  master->numslaves--;
808  if (master->numslaves == 0)
809  master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
810  return C_OK;
811  }
812  }
813  return C_ERR;
814 }
815 
816 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
817  int j;
818 
819  /* If it's already a slave, don't add it again. */
820  for (j = 0; j < master->numslaves; j++)
821  if (master->slaves[j] == slave) return C_ERR;
822  master->slaves = zrealloc(master->slaves,
823  sizeof(clusterNode*)*(master->numslaves+1));
824  master->slaves[master->numslaves] = slave;
825  master->numslaves++;
826  master->flags |= CLUSTER_NODE_MIGRATE_TO;
827  return C_OK;
828 }
829 
830 int clusterCountNonFailingSlaves(clusterNode *n) {
831  int j, okslaves = 0;
832 
833  for (j = 0; j < n->numslaves; j++)
834  if (!nodeFailed(n->slaves[j])) okslaves++;
835  return okslaves;
836 }
837 
838 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
839 void freeClusterNode(clusterNode *n) {
840  sds nodename;
841  int j;
842 
843  /* If the node has associated slaves, we have to set
844  * all the slaves->slaveof fields to NULL (unknown). */
845  for (j = 0; j < n->numslaves; j++)
846  n->slaves[j]->slaveof = NULL;
847 
848  /* Remove this node from the list of slaves of its master. */
849  if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
850 
851  /* Unlink from the set of nodes. */
852  nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
853  serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
854  sdsfree(nodename);
855 
856  /* Release link and associated data structures. */
857  if (n->link) freeClusterLink(n->link);
858  listRelease(n->fail_reports);
859  zfree(n->slaves);
860  zfree(n);
861 }
862 
863 /* Add a node to the nodes hash table */
864 int clusterAddNode(clusterNode *node) {
865  int retval;
866 
867  retval = dictAdd(server.cluster->nodes,
868  sdsnewlen(node->name,CLUSTER_NAMELEN), node);
869  return (retval == DICT_OK) ? C_OK : C_ERR;
870 }
871 
872 /* Remove a node from the cluster. The functio performs the high level
873  * cleanup, calling freeClusterNode() for the low level cleanup.
874  * Here we do the following:
875  *
876  * 1) Mark all the slots handled by it as unassigned.
877  * 2) Remove all the failure reports sent by this node and referenced by
878  * other nodes.
879  * 3) Free the node with freeClusterNode() that will in turn remove it
880  * from the hash table and from the list of slaves of its master, if
881  * it is a slave node.
882  */
883 void clusterDelNode(clusterNode *delnode) {
884  int j;
885  dictIterator *di;
886  dictEntry *de;
887 
888  /* 1) Mark slots as unassigned. */
889  for (j = 0; j < CLUSTER_SLOTS; j++) {
890  if (server.cluster->importing_slots_from[j] == delnode)
891  server.cluster->importing_slots_from[j] = NULL;
892  if (server.cluster->migrating_slots_to[j] == delnode)
893  server.cluster->migrating_slots_to[j] = NULL;
894  if (server.cluster->slots[j] == delnode)
895  clusterDelSlot(j);
896  }
897 
898  /* 2) Remove failure reports. */
899  di = dictGetSafeIterator(server.cluster->nodes);
900  while((de = dictNext(di)) != NULL) {
901  clusterNode *node = dictGetVal(de);
902 
903  if (node == delnode) continue;
904  clusterNodeDelFailureReport(node,delnode);
905  }
906  dictReleaseIterator(di);
907 
908  /* 3) Free the node, unlinking it from the cluster. */
909  freeClusterNode(delnode);
910 }
911 
912 /* Node lookup by name */
913 clusterNode *clusterLookupNode(char *name) {
914  sds s = sdsnewlen(name, CLUSTER_NAMELEN);
915  dictEntry *de;
916 
917  de = dictFind(server.cluster->nodes,s);
918  sdsfree(s);
919  if (de == NULL) return NULL;
920  return dictGetVal(de);
921 }
922 
923 /* This is only used after the handshake. When we connect a given IP/PORT
924  * as a result of CLUSTER MEET we don't have the node name yet, so we
925  * pick a random one, and will fix it when we receive the PONG request using
926  * this function. */
927 void clusterRenameNode(clusterNode *node, char *newname) {
928  int retval;
929  sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
930 
931  serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
932  node->name, newname);
933  retval = dictDelete(server.cluster->nodes, s);
934  sdsfree(s);
935  serverAssert(retval == DICT_OK);
936  memcpy(node->name, newname, CLUSTER_NAMELEN);
937  clusterAddNode(node);
938 }
939 
940 /* -----------------------------------------------------------------------------
941  * CLUSTER config epoch handling
942  * -------------------------------------------------------------------------- */
943 
944 /* Return the greatest configEpoch found in the cluster, or the current
945  * epoch if greater than any node configEpoch. */
947  uint64_t max = 0;
948  dictIterator *di;
949  dictEntry *de;
950 
951  di = dictGetSafeIterator(server.cluster->nodes);
952  while((de = dictNext(di)) != NULL) {
953  clusterNode *node = dictGetVal(de);
954  if (node->configEpoch > max) max = node->configEpoch;
955  }
956  dictReleaseIterator(di);
957  if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
958  return max;
959 }
960 
961 /* If this node epoch is zero or is not already the greatest across the
962  * cluster (from the POV of the local configuration), this function will:
963  *
964  * 1) Generate a new config epoch, incrementing the current epoch.
965  * 2) Assign the new epoch to this node, WITHOUT any consensus.
966  * 3) Persist the configuration on disk before sending packets with the
967  * new configuration.
968  *
969  * If the new config epoch is generated and assigend, C_OK is returned,
970  * otherwise C_ERR is returned (since the node has already the greatest
971  * configuration around) and no operation is performed.
972  *
973  * Important note: this function violates the principle that config epochs
974  * should be generated with consensus and should be unique across the cluster.
975  * However Redis Cluster uses this auto-generated new config epochs in two
976  * cases:
977  *
978  * 1) When slots are closed after importing. Otherwise resharding would be
979  * too expensive.
980  * 2) When CLUSTER FAILOVER is called with options that force a slave to
981  * failover its master even if there is not master majority able to
982  * create a new configuration epoch.
983  *
984  * Redis Cluster will not explode using this function, even in the case of
985  * a collision between this node and another node, generating the same
986  * configuration epoch unilaterally, because the config epoch conflict
987  * resolution algorithm will eventually move colliding nodes to different
988  * config epochs. However using this function may violate the "last failover
989  * wins" rule, so should only be used with care. */
991  uint64_t maxEpoch = clusterGetMaxEpoch();
992 
993  if (myself->configEpoch == 0 ||
994  myself->configEpoch != maxEpoch)
995  {
996  server.cluster->currentEpoch++;
997  myself->configEpoch = server.cluster->currentEpoch;
998  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
999  CLUSTER_TODO_FSYNC_CONFIG);
1000  serverLog(LL_WARNING,
1001  "New configEpoch set to %llu",
1002  (unsigned long long) myself->configEpoch);
1003  return C_OK;
1004  } else {
1005  return C_ERR;
1006  }
1007 }
1008 
1009 /* This function is called when this node is a master, and we receive from
1010  * another master a configuration epoch that is equal to our configuration
1011  * epoch.
1012  *
1013  * BACKGROUND
1014  *
1015  * It is not possible that different slaves get the same config
1016  * epoch during a failover election, because the slaves need to get voted
1017  * by a majority. However when we perform a manual resharding of the cluster
1018  * the node will assign a configuration epoch to itself without to ask
1019  * for agreement. Usually resharding happens when the cluster is working well
1020  * and is supervised by the sysadmin, however it is possible for a failover
1021  * to happen exactly while the node we are resharding a slot to assigns itself
1022  * a new configuration epoch, but before it is able to propagate it.
1023  *
1024  * So technically it is possible in this condition that two nodes end with
1025  * the same configuration epoch.
1026  *
1027  * Another possibility is that there are bugs in the implementation causing
1028  * this to happen.
1029  *
1030  * Moreover when a new cluster is created, all the nodes start with the same
1031  * configEpoch. This collision resolution code allows nodes to automatically
1032  * end with a different configEpoch at startup automatically.
1033  *
1034  * In all the cases, we want a mechanism that resolves this issue automatically
1035  * as a safeguard. The same configuration epoch for masters serving different
1036  * set of slots is not harmful, but it is if the nodes end serving the same
1037  * slots for some reason (manual errors or software bugs) without a proper
1038  * failover procedure.
1039  *
1040  * In general we want a system that eventually always ends with different
1041  * masters having different configuration epochs whatever happened, since
1042  * nothign is worse than a split-brain condition in a distributed system.
1043  *
1044  * BEHAVIOR
1045  *
1046  * When this function gets called, what happens is that if this node
1047  * has the lexicographically smaller Node ID compared to the other node
1048  * with the conflicting epoch (the 'sender' node), it will assign itself
1049  * the greatest configuration epoch currently detected among nodes plus 1.
1050  *
1051  * This means that even if there are multiple nodes colliding, the node
1052  * with the greatest Node ID never moves forward, so eventually all the nodes
1053  * end with a different configuration epoch.
1054  */
1055 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1056  /* Prerequisites: nodes have the same configEpoch and are both masters. */
1057  if (sender->configEpoch != myself->configEpoch ||
1058  !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1059  /* Don't act if the colliding node has a smaller Node ID. */
1060  if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1061  /* Get the next ID available at the best of this node knowledge. */
1062  server.cluster->currentEpoch++;
1063  myself->configEpoch = server.cluster->currentEpoch;
1065  serverLog(LL_VERBOSE,
1066  "WARNING: configEpoch collision with node %.40s."
1067  " configEpoch set to %llu",
1068  sender->name,
1069  (unsigned long long) myself->configEpoch);
1070 }
1071 
1072 /* -----------------------------------------------------------------------------
1073  * CLUSTER nodes blacklist
1074  *
1075  * The nodes blacklist is just a way to ensure that a given node with a given
1076  * Node ID is not readded before some time elapsed (this time is specified
1077  * in seconds in CLUSTER_BLACKLIST_TTL).
1078  *
1079  * This is useful when we want to remove a node from the cluster completely:
1080  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1081  * that even if we receive gossip messages from other nodes that still remember
1082  * about the node we want to remove, we don't re-add it before some time.
1083  *
1084  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1085  * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1086  * in the cluster without dealing with the problem of other nodes re-adding
1087  * back the node to nodes we already sent the FORGET command to.
1088  *
1089  * The data structure used is a hash table with an sds string representing
1090  * the node ID as key, and the time when it is ok to re-add the node as
1091  * value.
1092  * -------------------------------------------------------------------------- */
1093 
1094 #define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1095 
1096 
1097 /* Before of the addNode() or Exists() operations we always remove expired
1098  * entries from the black list. This is an O(N) operation but it is not a
1099  * problem since add / exists operations are called very infrequently and
1100  * the hash table is supposed to contain very little elements at max.
1101  * However without the cleanup during long uptimes and with some automated
1102  * node add/removal procedures, entries could accumulate. */
1104  dictIterator *di;
1105  dictEntry *de;
1106 
1107  di = dictGetSafeIterator(server.cluster->nodes_black_list);
1108  while((de = dictNext(di)) != NULL) {
1109  int64_t expire = dictGetUnsignedIntegerVal(de);
1110 
1111  if (expire < server.unixtime)
1112  dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1113  }
1114  dictReleaseIterator(di);
1115 }
1116 
1117 /* Cleanup the blacklist and add a new node ID to the black list. */
1118 void clusterBlacklistAddNode(clusterNode *node) {
1119  dictEntry *de;
1120  sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1121 
1123  if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1124  /* If the key was added, duplicate the sds string representation of
1125  * the key for the next lookup. We'll free it at the end. */
1126  id = sdsdup(id);
1127  }
1128  de = dictFind(server.cluster->nodes_black_list,id);
1129  dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1130  sdsfree(id);
1131 }
1132 
1133 /* Return non-zero if the specified node ID exists in the blacklist.
1134  * You don't need to pass an sds string here, any pointer to 40 bytes
1135  * will work. */
1136 int clusterBlacklistExists(char *nodeid) {
1137  sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1138  int retval;
1139 
1141  retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1142  sdsfree(id);
1143  return retval;
1144 }
1145 
1146 /* -----------------------------------------------------------------------------
1147  * CLUSTER messages exchange - PING/PONG and gossip
1148  * -------------------------------------------------------------------------- */
1149 
1150 /* This function checks if a given node should be marked as FAIL.
1151  * It happens if the following conditions are met:
1152  *
1153  * 1) We received enough failure reports from other master nodes via gossip.
1154  * Enough means that the majority of the masters signaled the node is
1155  * down recently.
1156  * 2) We believe this node is in PFAIL state.
1157  *
1158  * If a failure is detected we also inform the whole cluster about this
1159  * event trying to force every other node to set the FAIL flag for the node.
1160  *
1161  * Note that the form of agreement used here is weak, as we collect the majority
1162  * of masters state during some time, and even if we force agreement by
1163  * propagating the FAIL message, because of partitions we may not reach every
1164  * node. However:
1165  *
1166  * 1) Either we reach the majority and eventually the FAIL state will propagate
1167  * to all the cluster.
1168  * 2) Or there is no majority so no slave promotion will be authorized and the
1169  * FAIL flag will be cleared after some time.
1170  */
1171 void markNodeAsFailingIfNeeded(clusterNode *node) {
1172  int failures;
1173  int needed_quorum = (server.cluster->size / 2) + 1;
1174 
1175  if (!nodeTimedOut(node)) return; /* We can reach it. */
1176  if (nodeFailed(node)) return; /* Already FAILing. */
1177 
1178  failures = clusterNodeFailureReportsCount(node);
1179  /* Also count myself as a voter if I'm a master. */
1180  if (nodeIsMaster(myself)) failures++;
1181  if (failures < needed_quorum) return; /* No weak agreement from masters. */
1182 
1183  serverLog(LL_NOTICE,
1184  "Marking node %.40s as failing (quorum reached).", node->name);
1185 
1186  /* Mark the node as failing. */
1187  node->flags &= ~CLUSTER_NODE_PFAIL;
1188  node->flags |= CLUSTER_NODE_FAIL;
1189  node->fail_time = mstime();
1190 
1191  /* Broadcast the failing node name to everybody, forcing all the other
1192  * reachable nodes to flag the node as FAIL. */
1193  if (nodeIsMaster(myself)) clusterSendFail(node->name);
1194  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1195 }
1196 
1197 /* This function is called only if a node is marked as FAIL, but we are able
1198  * to reach it again. It checks if there are the conditions to undo the FAIL
1199  * state. */
1200 void clearNodeFailureIfNeeded(clusterNode *node) {
1201  mstime_t now = mstime();
1202 
1203  serverAssert(nodeFailed(node));
1204 
1205  /* For slaves we always clear the FAIL flag if we can contact the
1206  * node again. */
1207  if (nodeIsSlave(node) || node->numslots == 0) {
1208  serverLog(LL_NOTICE,
1209  "Clear FAIL state for node %.40s: %s is reachable again.",
1210  node->name,
1211  nodeIsSlave(node) ? "slave" : "master without slots");
1212  node->flags &= ~CLUSTER_NODE_FAIL;
1213  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1214  }
1215 
1216  /* If it is a master and...
1217  * 1) The FAIL state is old enough.
1218  * 2) It is yet serving slots from our point of view (not failed over).
1219  * Apparently no one is going to fix these slots, clear the FAIL flag. */
1220  if (nodeIsMaster(node) && node->numslots > 0 &&
1221  (now - node->fail_time) >
1222  (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1223  {
1224  serverLog(LL_NOTICE,
1225  "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1226  node->name);
1227  node->flags &= ~CLUSTER_NODE_FAIL;
1228  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1229  }
1230 }
1231 
1232 /* Return true if we already have a node in HANDSHAKE state matching the
1233  * specified ip address and port number. This function is used in order to
1234  * avoid adding a new handshake node for the same address multiple times. */
1235 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1236  dictIterator *di;
1237  dictEntry *de;
1238 
1239  di = dictGetSafeIterator(server.cluster->nodes);
1240  while((de = dictNext(di)) != NULL) {
1241  clusterNode *node = dictGetVal(de);
1242 
1243  if (!nodeInHandshake(node)) continue;
1244  if (!strcasecmp(node->ip,ip) &&
1245  node->port == port &&
1246  node->cport == cport) break;
1247  }
1248  dictReleaseIterator(di);
1249  return de != NULL;
1250 }
1251 
1252 /* Start an handshake with the specified address if there is not one
1253  * already in progress. Returns non-zero if the handshake was actually
1254  * started. On error zero is returned and errno is set to one of the
1255  * following values:
1256  *
1257  * EAGAIN - There is already an handshake in progress for this address.
1258  * EINVAL - IP or port are not valid. */
1259 int clusterStartHandshake(char *ip, int port, int cport) {
1260  clusterNode *n;
1261  char norm_ip[NET_IP_STR_LEN];
1262  struct sockaddr_storage sa;
1263 
1264  /* IP sanity check */
1265  if (inet_pton(AF_INET,ip,
1266  &(((struct sockaddr_in *)&sa)->sin_addr)))
1267  {
1268  sa.ss_family = AF_INET;
1269  } else if (inet_pton(AF_INET6,ip,
1270  &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1271  {
1272  sa.ss_family = AF_INET6;
1273  } else {
1274  errno = EINVAL;
1275  return 0;
1276  }
1277 
1278  /* Port sanity check */
1279  if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1280  errno = EINVAL;
1281  return 0;
1282  }
1283 
1284  /* Set norm_ip as the normalized string representation of the node
1285  * IP address. */
1286  memset(norm_ip,0,NET_IP_STR_LEN);
1287  if (sa.ss_family == AF_INET)
1288  inet_ntop(AF_INET,
1289  (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1290  norm_ip,NET_IP_STR_LEN);
1291  else
1292  inet_ntop(AF_INET6,
1293  (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1294  norm_ip,NET_IP_STR_LEN);
1295 
1296  if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1297  errno = EAGAIN;
1298  return 0;
1299  }
1300 
1301  /* Add the node with a random address (NULL as first argument to
1302  * createClusterNode()). Everything will be fixed during the
1303  * handshake. */
1304  n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1305  memcpy(n->ip,norm_ip,sizeof(n->ip));
1306  n->port = port;
1307  n->cport = cport;
1308  clusterAddNode(n);
1309  return 1;
1310 }
1311 
1312 /* Process the gossip section of PING or PONG packets.
1313  * Note that this function assumes that the packet is already sanity-checked
1314  * by the caller, not in the content of the gossip section, but in the
1315  * length. */
1316 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1317  uint16_t count = ntohs(hdr->count);
1318  clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1319  clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1320 
1321  while(count--) {
1322  uint16_t flags = ntohs(g->flags);
1323  clusterNode *node;
1324  sds ci;
1325 
1326  ci = representClusterNodeFlags(sdsempty(), flags);
1327  serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1328  g->nodename,
1329  g->ip,
1330  ntohs(g->port),
1331  ntohs(g->cport),
1332  ci);
1333  sdsfree(ci);
1334 
1335  /* Update our state accordingly to the gossip sections */
1336  node = clusterLookupNode(g->nodename);
1337  if (node) {
1338  /* We already know this node.
1339  Handle failure reports, only when the sender is a master. */
1340  if (sender && nodeIsMaster(sender) && node != myself) {
1341  if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1342  if (clusterNodeAddFailureReport(node,sender)) {
1343  serverLog(LL_VERBOSE,
1344  "Node %.40s reported node %.40s as not reachable.",
1345  sender->name, node->name);
1346  }
1348  } else {
1349  if (clusterNodeDelFailureReport(node,sender)) {
1350  serverLog(LL_VERBOSE,
1351  "Node %.40s reported node %.40s is back online.",
1352  sender->name, node->name);
1353  }
1354  }
1355  }
1356 
1357  /* If from our POV the node is up (no failure flags are set),
1358  * we have no pending ping for the node, nor we have failure
1359  * reports for this node, update the last pong time with the
1360  * one we see from the other nodes. */
1361  if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1362  node->ping_sent == 0 &&
1363  clusterNodeFailureReportsCount(node) == 0)
1364  {
1365  mstime_t pongtime = ntohl(g->pong_received);
1366  pongtime *= 1000; /* Convert back to milliseconds. */
1367 
1368  /* Replace the pong time with the received one only if
1369  * it's greater than our view but is not in the future
1370  * (with 500 milliseconds tolerance) from the POV of our
1371  * clock. */
1372  if (pongtime <= (server.mstime+500) &&
1373  pongtime > node->pong_received)
1374  {
1375  node->pong_received = pongtime;
1376  }
1377  }
1378 
1379  /* If we already know this node, but it is not reachable, and
1380  * we see a different address in the gossip section of a node that
1381  * can talk with this other node, update the address, disconnect
1382  * the old link if any, so that we'll attempt to connect with the
1383  * new address. */
1384  if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1385  !(flags & CLUSTER_NODE_NOADDR) &&
1386  !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1387  (strcasecmp(node->ip,g->ip) ||
1388  node->port != ntohs(g->port) ||
1389  node->cport != ntohs(g->cport)))
1390  {
1391  if (node->link) freeClusterLink(node->link);
1392  memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1393  node->port = ntohs(g->port);
1394  node->cport = ntohs(g->cport);
1395  node->flags &= ~CLUSTER_NODE_NOADDR;
1396  }
1397  } else {
1398  /* If it's not in NOADDR state and we don't have it, we
1399  * start a handshake process against this IP/PORT pairs.
1400  *
1401  * Note that we require that the sender of this gossip message
1402  * is a well known node in our cluster, otherwise we risk
1403  * joining another cluster. */
1404  if (sender &&
1405  !(flags & CLUSTER_NODE_NOADDR) &&
1406  !clusterBlacklistExists(g->nodename))
1407  {
1408  clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
1409  }
1410  }
1411 
1412  /* Next node */
1413  g++;
1414  }
1415 }
1416 
1417 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1418  * If 'announced_ip' length is non-zero, it is used instead of extracting
1419  * the IP from the socket peer address. */
1420 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1421  if (announced_ip[0] != '\0') {
1422  memcpy(buf,announced_ip,NET_IP_STR_LEN);
1423  buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1424  } else {
1425  anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1426  }
1427 }
1428 
1429 /* Update the node address to the IP address that can be extracted
1430  * from link->fd, or if hdr->myip is non empty, to the address the node
1431  * is announcing us. The port is taken from the packet header as well.
1432  *
1433  * If the address or port changed, disconnect the node link so that we'll
1434  * connect again to the new address.
1435  *
1436  * If the ip/port pair are already correct no operation is performed at
1437  * all.
1438  *
1439  * The function returns 0 if the node address is still the same,
1440  * otherwise 1 is returned. */
1441 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1442  clusterMsg *hdr)
1443 {
1444  char ip[NET_IP_STR_LEN] = {0};
1445  int port = ntohs(hdr->port);
1446  int cport = ntohs(hdr->cport);
1447 
1448  /* We don't proceed if the link is the same as the sender link, as this
1449  * function is designed to see if the node link is consistent with the
1450  * symmetric link that is used to receive PINGs from the node.
1451  *
1452  * As a side effect this function never frees the passed 'link', so
1453  * it is safe to call during packet processing. */
1454  if (link == node->link) return 0;
1455 
1456  nodeIp2String(ip,link,hdr->myip);
1457  if (node->port == port && node->cport == cport &&
1458  strcmp(ip,node->ip) == 0) return 0;
1459 
1460  /* IP / port is different, update it. */
1461  memcpy(node->ip,ip,sizeof(ip));
1462  node->port = port;
1463  node->cport = cport;
1464  if (node->link) freeClusterLink(node->link);
1465  node->flags &= ~CLUSTER_NODE_NOADDR;
1466  serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1467  node->name, node->ip, node->port);
1468 
1469  /* Check if this is our master and we have to change the
1470  * replication target as well. */
1471  if (nodeIsSlave(myself) && myself->slaveof == node)
1472  replicationSetMaster(node->ip, node->port);
1473  return 1;
1474 }
1475 
1476 /* Reconfigure the specified node 'n' as a master. This function is called when
1477  * a node that we believed to be a slave is now acting as master in order to
1478  * update the state of the node. */
1479 void clusterSetNodeAsMaster(clusterNode *n) {
1480  if (nodeIsMaster(n)) return;
1481 
1482  if (n->slaveof) {
1483  clusterNodeRemoveSlave(n->slaveof,n);
1484  if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1485  }
1486  n->flags &= ~CLUSTER_NODE_SLAVE;
1487  n->flags |= CLUSTER_NODE_MASTER;
1488  n->slaveof = NULL;
1489 
1490  /* Update config and state. */
1491  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1492  CLUSTER_TODO_UPDATE_STATE);
1493 }
1494 
1495 /* This function is called when we receive a master configuration via a
1496  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1497  * node, and the set of slots claimed under this configEpoch.
1498  *
1499  * What we do is to rebind the slots with newer configuration compared to our
1500  * local configuration, and if needed, we turn ourself into a replica of the
1501  * node (see the function comments for more info).
1502  *
1503  * The 'sender' is the node for which we received a configuration update.
1504  * Sometimes it is not actually the "Sender" of the information, like in the
1505  * case we receive the info via an UPDATE packet. */
1506 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1507  int j;
1508  clusterNode *curmaster, *newmaster = NULL;
1509  /* The dirty slots list is a list of slots for which we lose the ownership
1510  * while having still keys inside. This usually happens after a failover
1511  * or after a manual cluster reconfiguration operated by the admin.
1512  *
1513  * If the update message is not able to demote a master to slave (in this
1514  * case we'll resync with the master updating the whole key space), we
1515  * need to delete all the keys in the slots we lost ownership. */
1516  uint16_t dirty_slots[CLUSTER_SLOTS];
1517  int dirty_slots_count = 0;
1518 
1519  /* Here we set curmaster to this node or the node this node
1520  * replicates to if it's a slave. In the for loop we are
1521  * interested to check if slots are taken away from curmaster. */
1522  curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1523 
1524  if (sender == myself) {
1525  serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1526  return;
1527  }
1528 
1529  for (j = 0; j < CLUSTER_SLOTS; j++) {
1530  if (bitmapTestBit(slots,j)) {
1531  /* The slot is already bound to the sender of this message. */
1532  if (server.cluster->slots[j] == sender) continue;
1533 
1534  /* The slot is in importing state, it should be modified only
1535  * manually via redis-trib (example: a resharding is in progress
1536  * and the migrating side slot was already closed and is advertising
1537  * a new config. We still want the slot to be closed manually). */
1538  if (server.cluster->importing_slots_from[j]) continue;
1539 
1540  /* We rebind the slot to the new node claiming it if:
1541  * 1) The slot was unassigned or the new node claims it with a
1542  * greater configEpoch.
1543  * 2) We are not currently importing the slot. */
1544  if (server.cluster->slots[j] == NULL ||
1545  server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1546  {
1547  /* Was this slot mine, and still contains keys? Mark it as
1548  * a dirty slot. */
1549  if (server.cluster->slots[j] == myself &&
1550  countKeysInSlot(j) &&
1551  sender != myself)
1552  {
1553  dirty_slots[dirty_slots_count] = j;
1554  dirty_slots_count++;
1555  }
1556 
1557  if (server.cluster->slots[j] == curmaster)
1558  newmaster = sender;
1559  clusterDelSlot(j);
1560  clusterAddSlot(sender,j);
1561  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1562  CLUSTER_TODO_UPDATE_STATE|
1563  CLUSTER_TODO_FSYNC_CONFIG);
1564  }
1565  }
1566  }
1567 
1568  /* If at least one slot was reassigned from a node to another node
1569  * with a greater configEpoch, it is possible that:
1570  * 1) We are a master left without slots. This means that we were
1571  * failed over and we should turn into a replica of the new
1572  * master.
1573  * 2) We are a slave and our master is left without slots. We need
1574  * to replicate to the new slots owner. */
1575  if (newmaster && curmaster->numslots == 0) {
1576  serverLog(LL_WARNING,
1577  "Configuration change detected. Reconfiguring myself "
1578  "as a replica of %.40s", sender->name);
1579  clusterSetMaster(sender);
1580  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1581  CLUSTER_TODO_UPDATE_STATE|
1582  CLUSTER_TODO_FSYNC_CONFIG);
1583  } else if (dirty_slots_count) {
1584  /* If we are here, we received an update message which removed
1585  * ownership for certain slots we still have keys about, but still
1586  * we are serving some slots, so this master node was not demoted to
1587  * a slave.
1588  *
1589  * In order to maintain a consistent state between keys and slots
1590  * we need to remove all the keys from the slots we lost. */
1591  for (j = 0; j < dirty_slots_count; j++)
1592  delKeysInSlot(dirty_slots[j]);
1593  }
1594 }
1595 
1596 /* When this function is called, there is a packet to process starting
1597  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1598  * function should just handle the higher level stuff of processing the
1599  * packet, modifying the cluster state if needed.
1600  *
1601  * The function returns 1 if the link is still valid after the packet
1602  * was processed, otherwise 0 if the link was freed since the packet
1603  * processing lead to some inconsistency error (for instance a PONG
1604  * received from the wrong sender ID). */
1605 int clusterProcessPacket(clusterLink *link) {
1606  clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1607  uint32_t totlen = ntohl(hdr->totlen);
1608  uint16_t type = ntohs(hdr->type);
1609 
1610  if (type < CLUSTERMSG_TYPE_COUNT)
1611  server.cluster->stats_bus_messages_received[type]++;
1612  serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1613  type, (unsigned long) totlen);
1614 
1615  /* Perform sanity checks */
1616  if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1617  if (totlen > sdslen(link->rcvbuf)) return 1;
1618 
1619  if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1620  /* Can't handle messages of different versions. */
1621  return 1;
1622  }
1623 
1624  uint16_t flags = ntohs(hdr->flags);
1625  uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1626  clusterNode *sender;
1627 
1628  if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1629  type == CLUSTERMSG_TYPE_MEET)
1630  {
1631  uint16_t count = ntohs(hdr->count);
1632  uint32_t explen; /* expected length of this packet */
1633 
1634  explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1635  explen += (sizeof(clusterMsgDataGossip)*count);
1636  if (totlen != explen) return 1;
1637  } else if (type == CLUSTERMSG_TYPE_FAIL) {
1638  uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1639 
1640  explen += sizeof(clusterMsgDataFail);
1641  if (totlen != explen) return 1;
1642  } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1643  uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1644 
1645  explen += sizeof(clusterMsgDataPublish) -
1646  8 +
1647  ntohl(hdr->data.publish.msg.channel_len) +
1648  ntohl(hdr->data.publish.msg.message_len);
1649  if (totlen != explen) return 1;
1650  } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1651  type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1652  type == CLUSTERMSG_TYPE_MFSTART)
1653  {
1654  uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1655 
1656  if (totlen != explen) return 1;
1657  } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1658  uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1659 
1660  explen += sizeof(clusterMsgDataUpdate);
1661  if (totlen != explen) return 1;
1662  }
1663 
1664  /* Check if the sender is a known node. */
1665  sender = clusterLookupNode(hdr->sender);
1666  if (sender && !nodeInHandshake(sender)) {
1667  /* Update our curretEpoch if we see a newer epoch in the cluster. */
1668  senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1669  senderConfigEpoch = ntohu64(hdr->configEpoch);
1670  if (senderCurrentEpoch > server.cluster->currentEpoch)
1671  server.cluster->currentEpoch = senderCurrentEpoch;
1672  /* Update the sender configEpoch if it is publishing a newer one. */
1673  if (senderConfigEpoch > sender->configEpoch) {
1674  sender->configEpoch = senderConfigEpoch;
1675  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1676  CLUSTER_TODO_FSYNC_CONFIG);
1677  }
1678  /* Update the replication offset info for this node. */
1679  sender->repl_offset = ntohu64(hdr->offset);
1680  sender->repl_offset_time = mstime();
1681  /* If we are a slave performing a manual failover and our master
1682  * sent its offset while already paused, populate the MF state. */
1683  if (server.cluster->mf_end &&
1684  nodeIsSlave(myself) &&
1685  myself->slaveof == sender &&
1686  hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1687  server.cluster->mf_master_offset == 0)
1688  {
1689  server.cluster->mf_master_offset = sender->repl_offset;
1690  serverLog(LL_WARNING,
1691  "Received replication offset for paused "
1692  "master manual failover: %lld",
1693  server.cluster->mf_master_offset);
1694  }
1695  }
1696 
1697  /* Initial processing of PING and MEET requests replying with a PONG. */
1698  if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1699  serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1700 
1701  /* We use incoming MEET messages in order to set the address
1702  * for 'myself', since only other cluster nodes will send us
1703  * MEET messages on handshakes, when the cluster joins, or
1704  * later if we changed address, and those nodes will use our
1705  * official address to connect to us. So by obtaining this address
1706  * from the socket is a simple way to discover / update our own
1707  * address in the cluster without it being hardcoded in the config.
1708  *
1709  * However if we don't have an address at all, we update the address
1710  * even with a normal PING packet. If it's wrong it will be fixed
1711  * by MEET later. */
1712  if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1713  server.cluster_announce_ip == NULL)
1714  {
1715  char ip[NET_IP_STR_LEN];
1716 
1717  if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1718  strcmp(ip,myself->ip))
1719  {
1720  memcpy(myself->ip,ip,NET_IP_STR_LEN);
1721  serverLog(LL_WARNING,"IP address for this node updated to %s",
1722  myself->ip);
1723  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1724  }
1725  }
1726 
1727  /* Add this node if it is new for us and the msg type is MEET.
1728  * In this stage we don't try to add the node with the right
1729  * flags, slaveof pointer, and so forth, as this details will be
1730  * resolved when we'll receive PONGs from the node. */
1731  if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1732  clusterNode *node;
1733 
1734  node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1735  nodeIp2String(node->ip,link,hdr->myip);
1736  node->port = ntohs(hdr->port);
1737  node->cport = ntohs(hdr->cport);
1738  clusterAddNode(node);
1739  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1740  }
1741 
1742  /* If this is a MEET packet from an unknown node, we still process
1743  * the gossip section here since we have to trust the sender because
1744  * of the message type. */
1745  if (!sender && type == CLUSTERMSG_TYPE_MEET)
1747 
1748  /* Anyway reply with a PONG */
1749  clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1750  }
1751 
1752  /* PING, PONG, MEET: process config information. */
1753  if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1754  type == CLUSTERMSG_TYPE_MEET)
1755  {
1756  serverLog(LL_DEBUG,"%s packet received: %p",
1757  type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1758  (void*)link->node);
1759  if (link->node) {
1760  if (nodeInHandshake(link->node)) {
1761  /* If we already have this node, try to change the
1762  * IP/port of the node with the new one. */
1763  if (sender) {
1764  serverLog(LL_VERBOSE,
1765  "Handshake: we already know node %.40s, "
1766  "updating the address if needed.", sender->name);
1767  if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1768  {
1769  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1770  CLUSTER_TODO_UPDATE_STATE);
1771  }
1772  /* Free this node as we already have it. This will
1773  * cause the link to be freed as well. */
1774  clusterDelNode(link->node);
1775  return 0;
1776  }
1777 
1778  /* First thing to do is replacing the random name with the
1779  * right node name if this was a handshake stage. */
1780  clusterRenameNode(link->node, hdr->sender);
1781  serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1782  link->node->name);
1783  link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1784  link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1785  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1786  } else if (memcmp(link->node->name,hdr->sender,
1787  CLUSTER_NAMELEN) != 0)
1788  {
1789  /* If the reply has a non matching node ID we
1790  * disconnect this node and set it as not having an associated
1791  * address. */
1792  serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1793  link->node->name,
1794  (int)(mstime()-(link->node->ctime)),
1795  link->node->flags);
1796  link->node->flags |= CLUSTER_NODE_NOADDR;
1797  link->node->ip[0] = '\0';
1798  link->node->port = 0;
1799  link->node->cport = 0;
1801  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1802  return 0;
1803  }
1804  }
1805 
1806  /* Update the node address if it changed. */
1807  if (sender && type == CLUSTERMSG_TYPE_PING &&
1808  !nodeInHandshake(sender) &&
1809  nodeUpdateAddressIfNeeded(sender,link,hdr))
1810  {
1811  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1812  CLUSTER_TODO_UPDATE_STATE);
1813  }
1814 
1815  /* Update our info about the node */
1816  if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1817  link->node->pong_received = mstime();
1818  link->node->ping_sent = 0;
1819 
1820  /* The PFAIL condition can be reversed without external
1821  * help if it is momentary (that is, if it does not
1822  * turn into a FAIL state).
1823  *
1824  * The FAIL condition is also reversible under specific
1825  * conditions detected by clearNodeFailureIfNeeded(). */
1826  if (nodeTimedOut(link->node)) {
1827  link->node->flags &= ~CLUSTER_NODE_PFAIL;
1828  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1829  CLUSTER_TODO_UPDATE_STATE);
1830  } else if (nodeFailed(link->node)) {
1832  }
1833  }
1834 
1835  /* Check for role switch: slave -> master or master -> slave. */
1836  if (sender) {
1837  if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1838  sizeof(hdr->slaveof)))
1839  {
1840  /* Node is a master. */
1841  clusterSetNodeAsMaster(sender);
1842  } else {
1843  /* Node is a slave. */
1844  clusterNode *master = clusterLookupNode(hdr->slaveof);
1845 
1846  if (nodeIsMaster(sender)) {
1847  /* Master turned into a slave! Reconfigure the node. */
1848  clusterDelNodeSlots(sender);
1849  sender->flags &= ~(CLUSTER_NODE_MASTER|
1850  CLUSTER_NODE_MIGRATE_TO);
1851  sender->flags |= CLUSTER_NODE_SLAVE;
1852 
1853  /* Update config and state. */
1854  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1855  CLUSTER_TODO_UPDATE_STATE);
1856  }
1857 
1858  /* Master node changed for this slave? */
1859  if (master && sender->slaveof != master) {
1860  if (sender->slaveof)
1861  clusterNodeRemoveSlave(sender->slaveof,sender);
1862  clusterNodeAddSlave(master,sender);
1863  sender->slaveof = master;
1864 
1865  /* Update config. */
1866  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1867  }
1868  }
1869  }
1870 
1871  /* Update our info about served slots.
1872  *
1873  * Note: this MUST happen after we update the master/slave state
1874  * so that CLUSTER_NODE_MASTER flag will be set. */
1875 
1876  /* Many checks are only needed if the set of served slots this
1877  * instance claims is different compared to the set of slots we have
1878  * for it. Check this ASAP to avoid other computational expansive
1879  * checks later. */
1880  clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1881  int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1882 
1883  if (sender) {
1884  sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1885  if (sender_master) {
1886  dirty_slots = memcmp(sender_master->slots,
1887  hdr->myslots,sizeof(hdr->myslots)) != 0;
1888  }
1889  }
1890 
1891  /* 1) If the sender of the message is a master, and we detected that
1892  * the set of slots it claims changed, scan the slots to see if we
1893  * need to update our configuration. */
1894  if (sender && nodeIsMaster(sender) && dirty_slots)
1895  clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1896 
1897  /* 2) We also check for the reverse condition, that is, the sender
1898  * claims to serve slots we know are served by a master with a
1899  * greater configEpoch. If this happens we inform the sender.
1900  *
1901  * This is useful because sometimes after a partition heals, a
1902  * reappearing master may be the last one to claim a given set of
1903  * hash slots, but with a configuration that other instances know to
1904  * be deprecated. Example:
1905  *
1906  * A and B are master and slave for slots 1,2,3.
1907  * A is partitioned away, B gets promoted.
1908  * B is partitioned away, and A returns available.
1909  *
1910  * Usually B would PING A publishing its set of served slots and its
1911  * configEpoch, but because of the partition B can't inform A of the
1912  * new configuration, so other nodes that have an updated table must
1913  * do it. In this way A will stop to act as a master (or can try to
1914  * failover if there are the conditions to win the election). */
1915  if (sender && dirty_slots) {
1916  int j;
1917 
1918  for (j = 0; j < CLUSTER_SLOTS; j++) {
1919  if (bitmapTestBit(hdr->myslots,j)) {
1920  if (server.cluster->slots[j] == sender ||
1921  server.cluster->slots[j] == NULL) continue;
1922  if (server.cluster->slots[j]->configEpoch >
1923  senderConfigEpoch)
1924  {
1925  serverLog(LL_VERBOSE,
1926  "Node %.40s has old slots configuration, sending "
1927  "an UPDATE message about %.40s",
1928  sender->name, server.cluster->slots[j]->name);
1929  clusterSendUpdate(sender->link,
1930  server.cluster->slots[j]);
1931 
1932  /* TODO: instead of exiting the loop send every other
1933  * UPDATE packet for other nodes that are the new owner
1934  * of sender's slots. */
1935  break;
1936  }
1937  }
1938  }
1939  }
1940 
1941  /* If our config epoch collides with the sender's try to fix
1942  * the problem. */
1943  if (sender &&
1944  nodeIsMaster(myself) && nodeIsMaster(sender) &&
1945  senderConfigEpoch == myself->configEpoch)
1946  {
1948  }
1949 
1950  /* Get info from the gossip section */
1951  if (sender) clusterProcessGossipSection(hdr,link);
1952  } else if (type == CLUSTERMSG_TYPE_FAIL) {
1953  clusterNode *failing;
1954 
1955  if (sender) {
1956  failing = clusterLookupNode(hdr->data.fail.about.nodename);
1957  if (failing &&
1958  !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
1959  {
1960  serverLog(LL_NOTICE,
1961  "FAIL message received from %.40s about %.40s",
1962  hdr->sender, hdr->data.fail.about.nodename);
1963  failing->flags |= CLUSTER_NODE_FAIL;
1964  failing->fail_time = mstime();
1965  failing->flags &= ~CLUSTER_NODE_PFAIL;
1966  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1967  CLUSTER_TODO_UPDATE_STATE);
1968  }
1969  } else {
1970  serverLog(LL_NOTICE,
1971  "Ignoring FAIL message from unknown node %.40s about %.40s",
1972  hdr->sender, hdr->data.fail.about.nodename);
1973  }
1974  } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1975  robj *channel, *message;
1976  uint32_t channel_len, message_len;
1977 
1978  /* Don't bother creating useless objects if there are no
1979  * Pub/Sub subscribers. */
1980  if (dictSize(server.pubsub_channels) ||
1981  listLength(server.pubsub_patterns))
1982  {
1983  channel_len = ntohl(hdr->data.publish.msg.channel_len);
1984  message_len = ntohl(hdr->data.publish.msg.message_len);
1985  channel = createStringObject(
1986  (char*)hdr->data.publish.msg.bulk_data,channel_len);
1987  message = createStringObject(
1988  (char*)hdr->data.publish.msg.bulk_data+channel_len,
1989  message_len);
1990  pubsubPublishMessage(channel,message);
1991  decrRefCount(channel);
1992  decrRefCount(message);
1993  }
1994  } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
1995  if (!sender) return 1; /* We don't know that node. */
1996  clusterSendFailoverAuthIfNeeded(sender,hdr);
1997  } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
1998  if (!sender) return 1; /* We don't know that node. */
1999  /* We consider this vote only if the sender is a master serving
2000  * a non zero number of slots, and its currentEpoch is greater or
2001  * equal to epoch where this node started the election. */
2002  if (nodeIsMaster(sender) && sender->numslots > 0 &&
2003  senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2004  {
2005  server.cluster->failover_auth_count++;
2006  /* Maybe we reached a quorum here, set a flag to make sure
2007  * we check ASAP. */
2008  clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2009  }
2010  } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2011  /* This message is acceptable only if I'm a master and the sender
2012  * is one of my slaves. */
2013  if (!sender || sender->slaveof != myself) return 1;
2014  /* Manual failover requested from slaves. Initialize the state
2015  * accordingly. */
2017  server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
2018  server.cluster->mf_slave = sender;
2019  pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
2020  serverLog(LL_WARNING,"Manual failover requested by slave %.40s.",
2021  sender->name);
2022  } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2023  clusterNode *n; /* The node the update is about. */
2024  uint64_t reportedConfigEpoch =
2025  ntohu64(hdr->data.update.nodecfg.configEpoch);
2026 
2027  if (!sender) return 1; /* We don't know the sender. */
2028  n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2029  if (!n) return 1; /* We don't know the reported node. */
2030  if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2031 
2032  /* If in our current config the node is a slave, set it as a master. */
2033  if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2034 
2035  /* Update the node's configEpoch. */
2036  n->configEpoch = reportedConfigEpoch;
2037  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2038  CLUSTER_TODO_FSYNC_CONFIG);
2039 
2040  /* Check the bitmap of served slots and update our
2041  * config accordingly. */
2042  clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2043  hdr->data.update.nodecfg.slots);
2044  } else {
2045  serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2046  }
2047  return 1;
2048 }
2049 
2050 /* This function is called when we detect the link with this node is lost.
2051  We set the node as no longer connected. The Cluster Cron will detect
2052  this connection and will try to get it connected again.
2053 
2054  Instead if the node is a temporary node used to accept a query, we
2055  completely free the node on error. */
2056 void handleLinkIOError(clusterLink *link) {
2058 }
2059 
2060 /* Send data. This is handled using a trivial send buffer that gets
2061  * consumed by write(). We don't try to optimize this for speed too much
2062  * as this is a very low traffic channel. */
2063 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2064  clusterLink *link = (clusterLink*) privdata;
2065  ssize_t nwritten;
2066  UNUSED(el);
2067  UNUSED(mask);
2068 
2069  nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2070  if (nwritten <= 0) {
2071  serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2072  strerror(errno));
2074  return;
2075  }
2076  sdsrange(link->sndbuf,nwritten,-1);
2077  if (sdslen(link->sndbuf) == 0)
2078  aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2079 }
2080 
2081 /* Read data. Try to read the first field of the header first to check the
2082  * full length of the packet. When a whole packet is in memory this function
2083  * will call the function to process the packet. And so forth. */
2084 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2085  char buf[sizeof(clusterMsg)];
2086  ssize_t nread;
2087  clusterMsg *hdr;
2088  clusterLink *link = (clusterLink*) privdata;
2089  unsigned int readlen, rcvbuflen;
2090  UNUSED(el);
2091  UNUSED(mask);
2092 
2093  while(1) { /* Read as long as there is data to read. */
2094  rcvbuflen = sdslen(link->rcvbuf);
2095  if (rcvbuflen < 8) {
2096  /* First, obtain the first 8 bytes to get the full message
2097  * length. */
2098  readlen = 8 - rcvbuflen;
2099  } else {
2100  /* Finally read the full message. */
2101  hdr = (clusterMsg*) link->rcvbuf;
2102  if (rcvbuflen == 8) {
2103  /* Perform some sanity check on the message signature
2104  * and length. */
2105  if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2106  ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2107  {
2108  serverLog(LL_WARNING,
2109  "Bad message length or signature received "
2110  "from Cluster bus.");
2112  return;
2113  }
2114  }
2115  readlen = ntohl(hdr->totlen) - rcvbuflen;
2116  if (readlen > sizeof(buf)) readlen = sizeof(buf);
2117  }
2118 
2119  nread = read(fd,buf,readlen);
2120  if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2121 
2122  if (nread <= 0) {
2123  /* I/O error... */
2124  serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2125  (nread == 0) ? "connection closed" : strerror(errno));
2127  return;
2128  } else {
2129  /* Read data and recast the pointer to the new buffer. */
2130  link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2131  hdr = (clusterMsg*) link->rcvbuf;
2132  rcvbuflen += nread;
2133  }
2134 
2135  /* Total length obtained? Process this packet. */
2136  if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2137  if (clusterProcessPacket(link)) {
2138  sdsfree(link->rcvbuf);
2139  link->rcvbuf = sdsempty();
2140  } else {
2141  return; /* Link no longer valid. */
2142  }
2143  }
2144  }
2145 }
2146 
2147 /* Put stuff into the send buffer.
2148  *
2149  * It is guaranteed that this function will never have as a side effect
2150  * the link to be invalidated, so it is safe to call this function
2151  * from event handlers that will do stuff with the same link later. */
2152 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2153  if (sdslen(link->sndbuf) == 0 && msglen != 0)
2154  aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
2156 
2157  link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2158 
2159  /* Populate sent messages stats. */
2160  clusterMsg *hdr = (clusterMsg*) msg;
2161  uint16_t type = ntohs(hdr->type);
2162  if (type < CLUSTERMSG_TYPE_COUNT)
2163  server.cluster->stats_bus_messages_sent[type]++;
2164 }
2165 
2166 /* Send a message to all the nodes that are part of the cluster having
2167  * a connected link.
2168  *
2169  * It is guaranteed that this function will never have as a side effect
2170  * some node->link to be invalidated, so it is safe to call this function
2171  * from event handlers that will do stuff with node links later. */
2172 void clusterBroadcastMessage(void *buf, size_t len) {
2173  dictIterator *di;
2174  dictEntry *de;
2175 
2176  di = dictGetSafeIterator(server.cluster->nodes);
2177  while((de = dictNext(di)) != NULL) {
2178  clusterNode *node = dictGetVal(de);
2179 
2180  if (!node->link) continue;
2181  if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2182  continue;
2183  clusterSendMessage(node->link,buf,len);
2184  }
2185  dictReleaseIterator(di);
2186 }
2187 
2188 /* Build the message header. hdr must point to a buffer at least
2189  * sizeof(clusterMsg) in bytes. */
2190 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2191  int totlen = 0;
2192  uint64_t offset;
2193  clusterNode *master;
2194 
2195  /* If this node is a master, we send its slots bitmap and configEpoch.
2196  * If this node is a slave we send the master's information instead (the
2197  * node is flagged as slave so the receiver knows that it is NOT really
2198  * in charge for this slots. */
2199  master = (nodeIsSlave(myself) && myself->slaveof) ?
2200  myself->slaveof : myself;
2201 
2202  memset(hdr,0,sizeof(*hdr));
2203  hdr->ver = htons(CLUSTER_PROTO_VER);
2204  hdr->sig[0] = 'R';
2205  hdr->sig[1] = 'C';
2206  hdr->sig[2] = 'm';
2207  hdr->sig[3] = 'b';
2208  hdr->type = htons(type);
2209  memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2210 
2211  /* If cluster-announce-ip option is enabled, force the receivers of our
2212  * packets to use the specified address for this node. Otherwise if the
2213  * first byte is zero, they'll do auto discovery. */
2214  memset(hdr->myip,0,NET_IP_STR_LEN);
2215  if (server.cluster_announce_ip) {
2216  strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2217  hdr->myip[NET_IP_STR_LEN-1] = '\0';
2218  }
2219 
2220  /* Handle cluster-announce-port as well. */
2221  int announced_port = server.cluster_announce_port ?
2222  server.cluster_announce_port : server.port;
2223  int announced_cport = server.cluster_announce_bus_port ?
2224  server.cluster_announce_bus_port :
2225  (server.port + CLUSTER_PORT_INCR);
2226 
2227  memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2228  memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2229  if (myself->slaveof != NULL)
2230  memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2231  hdr->port = htons(announced_port);
2232  hdr->cport = htons(announced_cport);
2233  hdr->flags = htons(myself->flags);
2234  hdr->state = server.cluster->state;
2235 
2236  /* Set the currentEpoch and configEpochs. */
2237  hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2238  hdr->configEpoch = htonu64(master->configEpoch);
2239 
2240  /* Set the replication offset. */
2241  if (nodeIsSlave(myself))
2242  offset = replicationGetSlaveOffset();
2243  else
2244  offset = server.master_repl_offset;
2245  hdr->offset = htonu64(offset);
2246 
2247  /* Set the message flags. */
2248  if (nodeIsMaster(myself) && server.cluster->mf_end)
2249  hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2250 
2251  /* Compute the message length for certain messages. For other messages
2252  * this is up to the caller. */
2253  if (type == CLUSTERMSG_TYPE_FAIL) {
2254  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2255  totlen += sizeof(clusterMsgDataFail);
2256  } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2257  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2258  totlen += sizeof(clusterMsgDataUpdate);
2259  }
2260  hdr->totlen = htonl(totlen);
2261  /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2262 }
2263 
2264 /* Return non zero if the node is already present in the gossip section of the
2265  * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2266  * zero is returned. Helper for clusterSendPing(). */
2267 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2268  int j;
2269  for (j = 0; j < count; j++) {
2270  if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2271  CLUSTER_NAMELEN) == 0) break;
2272  }
2273  return j != count;
2274 }
2275 
2276 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2277  * to the info of the specified node 'n'. */
2278 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2279  clusterMsgDataGossip *gossip;
2280  gossip = &(hdr->data.ping.gossip[i]);
2281  memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2282  gossip->ping_sent = htonl(n->ping_sent/1000);
2283  gossip->pong_received = htonl(n->pong_received/1000);
2284  memcpy(gossip->ip,n->ip,sizeof(n->ip));
2285  gossip->port = htons(n->port);
2286  gossip->cport = htons(n->cport);
2287  gossip->flags = htons(n->flags);
2288  gossip->notused1 = 0;
2289 }
2290 
2291 /* Send a PING or PONG packet to the specified node, making sure to add enough
2292  * gossip informations. */
2293 void clusterSendPing(clusterLink *link, int type) {
2294  unsigned char *buf;
2295  clusterMsg *hdr;
2296  int gossipcount = 0; /* Number of gossip sections added so far. */
2297  int wanted; /* Number of gossip sections we want to append if possible. */
2298  int totlen; /* Total packet length. */
2299  /* freshnodes is the max number of nodes we can hope to append at all:
2300  * nodes available minus two (ourself and the node we are sending the
2301  * message to). However practically there may be less valid nodes since
2302  * nodes in handshake state, disconnected, are not considered. */
2303  int freshnodes = dictSize(server.cluster->nodes)-2;
2304 
2305  /* How many gossip sections we want to add? 1/10 of the number of nodes
2306  * and anyway at least 3. Why 1/10?
2307  *
2308  * If we have N masters, with N/10 entries, and we consider that in
2309  * node_timeout we exchange with each other node at least 4 packets
2310  * (we ping in the worst case in node_timeout/2 time, and we also
2311  * receive two pings from the host), we have a total of 8 packets
2312  * in the node_timeout*2 falure reports validity time. So we have
2313  * that, for a single PFAIL node, we can expect to receive the following
2314  * number of failure reports (in the specified window of time):
2315  *
2316  * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2317  *
2318  * PROB = probability of being featured in a single gossip entry,
2319  * which is 1 / NUM_OF_NODES.
2320  * ENTRIES = 10.
2321  * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2322  *
2323  * If we assume we have just masters (so num of nodes and num of masters
2324  * is the same), with 1/10 we always get over the majority, and specifically
2325  * 80% of the number of nodes, to account for many masters failing at the
2326  * same time.
2327  *
2328  * Since we have non-voting slaves that lower the probability of an entry
2329  * to feature our node, we set the number of entires per packet as
2330  * 10% of the total nodes we have. */
2331  wanted = floor(dictSize(server.cluster->nodes)/10);
2332  if (wanted < 3) wanted = 3;
2333  if (wanted > freshnodes) wanted = freshnodes;
2334 
2335  /* Include all the nodes in PFAIL state, so that failure reports are
2336  * faster to propagate to go from PFAIL to FAIL state. */
2337  int pfail_wanted = server.cluster->stats_pfail_nodes;
2338 
2339  /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2340  * later according to the number of gossip sections we really were able
2341  * to put inside the packet. */
2342  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2343  totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2344  /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2345  * sizeof(clusterMsg) or more. */
2346  if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2347  buf = zcalloc(totlen);
2348  hdr = (clusterMsg*) buf;
2349 
2350  /* Populate the header. */
2351  if (link->node && type == CLUSTERMSG_TYPE_PING)
2352  link->node->ping_sent = mstime();
2354 
2355  /* Populate the gossip fields */
2356  int maxiterations = wanted*3;
2357  while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2358  dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2359  clusterNode *this = dictGetVal(de);
2360 
2361  /* Don't include this node: the whole packet header is about us
2362  * already, so we just gossip about other nodes. */
2363  if (this == myself) continue;
2364 
2365  /* PFAIL nodes will be added later. */
2366  if (this->flags & CLUSTER_NODE_PFAIL) continue;
2367 
2368  /* In the gossip section don't include:
2369  * 1) Nodes in HANDSHAKE state.
2370  * 3) Nodes with the NOADDR flag set.
2371  * 4) Disconnected nodes if they don't have configured slots.
2372  */
2373  if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2374  (this->link == NULL && this->numslots == 0))
2375  {
2376  freshnodes--; /* Tecnically not correct, but saves CPU. */
2377  continue;
2378  }
2379 
2380  /* Do not add a node we already have. */
2381  if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2382 
2383  /* Add it */
2384  clusterSetGossipEntry(hdr,gossipcount,this);
2385  freshnodes--;
2386  gossipcount++;
2387  }
2388 
2389  /* If there are PFAIL nodes, add them at the end. */
2390  if (pfail_wanted) {
2391  dictIterator *di;
2392  dictEntry *de;
2393 
2394  di = dictGetSafeIterator(server.cluster->nodes);
2395  while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2396  clusterNode *node = dictGetVal(de);
2397  if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2398  if (node->flags & CLUSTER_NODE_NOADDR) continue;
2399  if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2400  clusterSetGossipEntry(hdr,gossipcount,node);
2401  freshnodes--;
2402  gossipcount++;
2403  /* We take the count of the slots we allocated, since the
2404  * PFAIL stats may not match perfectly with the current number
2405  * of PFAIL nodes. */
2406  pfail_wanted--;
2407  }
2408  dictReleaseIterator(di);
2409  }
2410 
2411  /* Ready to send... fix the totlen fiend and queue the message in the
2412  * output buffer. */
2413  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2414  totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2415  hdr->count = htons(gossipcount);
2416  hdr->totlen = htonl(totlen);
2417  clusterSendMessage(link,buf,totlen);
2418  zfree(buf);
2419 }
2420 
2421 /* Send a PONG packet to every connected node that's not in handshake state
2422  * and for which we have a valid link.
2423  *
2424  * In Redis Cluster pongs are not used just for failure detection, but also
2425  * to carry important configuration information. So broadcasting a pong is
2426  * useful when something changes in the configuration and we want to make
2427  * the cluster aware ASAP (for instance after a slave promotion).
2428  *
2429  * The 'target' argument specifies the receiving instances using the
2430  * defines below:
2431  *
2432  * CLUSTER_BROADCAST_ALL -> All known instances.
2433  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2434  */
2435 #define CLUSTER_BROADCAST_ALL 0
2436 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
2437 void clusterBroadcastPong(int target) {
2438  dictIterator *di;
2439  dictEntry *de;
2440 
2441  di = dictGetSafeIterator(server.cluster->nodes);
2442  while((de = dictNext(di)) != NULL) {
2443  clusterNode *node = dictGetVal(de);
2444 
2445  if (!node->link) continue;
2446  if (node == myself || nodeInHandshake(node)) continue;
2447  if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2448  int local_slave =
2449  nodeIsSlave(node) && node->slaveof &&
2450  (node->slaveof == myself || node->slaveof == myself->slaveof);
2451  if (!local_slave) continue;
2452  }
2453  clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2454  }
2455  dictReleaseIterator(di);
2456 }
2457 
2458 /* Send a PUBLISH message.
2459  *
2460  * If link is NULL, then the message is broadcasted to the whole cluster. */
2461 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2462  unsigned char buf[sizeof(clusterMsg)], *payload;
2463  clusterMsg *hdr = (clusterMsg*) buf;
2464  uint32_t totlen;
2465  uint32_t channel_len, message_len;
2466 
2467  channel = getDecodedObject(channel);
2468  message = getDecodedObject(message);
2469  channel_len = sdslen(channel->ptr);
2470  message_len = sdslen(message->ptr);
2471 
2472  clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2473  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2474  totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2475 
2476  hdr->data.publish.msg.channel_len = htonl(channel_len);
2477  hdr->data.publish.msg.message_len = htonl(message_len);
2478  hdr->totlen = htonl(totlen);
2479 
2480  /* Try to use the local buffer if possible */
2481  if (totlen < sizeof(buf)) {
2482  payload = buf;
2483  } else {
2484  payload = zmalloc(totlen);
2485  memcpy(payload,hdr,sizeof(*hdr));
2486  hdr = (clusterMsg*) payload;
2487  }
2488  memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2489  memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2490  message->ptr,sdslen(message->ptr));
2491 
2492  if (link)
2493  clusterSendMessage(link,payload,totlen);
2494  else
2495  clusterBroadcastMessage(payload,totlen);
2496 
2497  decrRefCount(channel);
2498  decrRefCount(message);
2499  if (payload != buf) zfree(payload);
2500 }
2501 
2502 /* Send a FAIL message to all the nodes we are able to contact.
2503  * The FAIL message is sent when we detect that a node is failing
2504  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2505  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2506  * nodes to do the same ASAP. */
2507 void clusterSendFail(char *nodename) {
2508  unsigned char buf[sizeof(clusterMsg)];
2509  clusterMsg *hdr = (clusterMsg*) buf;
2510 
2511  clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2512  memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2513  clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2514 }
2515 
2516 /* Send an UPDATE message to the specified link carrying the specified 'node'
2517  * slots configuration. The node name, slots bitmap, and configEpoch info
2518  * are included. */
2519 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2520  unsigned char buf[sizeof(clusterMsg)];
2521  clusterMsg *hdr = (clusterMsg*) buf;
2522 
2523  if (link == NULL) return;
2524  clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2525  memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2526  hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2527  memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2528  clusterSendMessage(link,buf,ntohl(hdr->totlen));
2529 }
2530 
2531 /* -----------------------------------------------------------------------------
2532  * CLUSTER Pub/Sub support
2533  *
2534  * For now we do very little, just propagating PUBLISH messages across the whole
2535  * cluster. In the future we'll try to get smarter and avoiding propagating those
2536  * messages to hosts without receives for a given channel.
2537  * -------------------------------------------------------------------------- */
2538 void clusterPropagatePublish(robj *channel, robj *message) {
2539  clusterSendPublish(NULL, channel, message);
2540 }
2541 
2542 /* -----------------------------------------------------------------------------
2543  * SLAVE node specific functions
2544  * -------------------------------------------------------------------------- */
2545 
2546 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2547  * see if there is the quorum for this slave instance to failover its failing
2548  * master.
2549  *
2550  * Note that we send the failover request to everybody, master and slave nodes,
2551  * but only the masters are supposed to reply to our query. */
2553  unsigned char buf[sizeof(clusterMsg)];
2554  clusterMsg *hdr = (clusterMsg*) buf;
2555  uint32_t totlen;
2556 
2557  clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2558  /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2559  * in the header to communicate the nodes receiving the message that
2560  * they should authorized the failover even if the master is working. */
2561  if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2562  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2563  hdr->totlen = htonl(totlen);
2564  clusterBroadcastMessage(buf,totlen);
2565 }
2566 
2567 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
2568 void clusterSendFailoverAuth(clusterNode *node) {
2569  unsigned char buf[sizeof(clusterMsg)];
2570  clusterMsg *hdr = (clusterMsg*) buf;
2571  uint32_t totlen;
2572 
2573  if (!node->link) return;
2574  clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2575  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2576  hdr->totlen = htonl(totlen);
2577  clusterSendMessage(node->link,buf,totlen);
2578 }
2579 
2580 /* Send a MFSTART message to the specified node. */
2581 void clusterSendMFStart(clusterNode *node) {
2582  unsigned char buf[sizeof(clusterMsg)];
2583  clusterMsg *hdr = (clusterMsg*) buf;
2584  uint32_t totlen;
2585 
2586  if (!node->link) return;
2587  clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2588  totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2589  hdr->totlen = htonl(totlen);
2590  clusterSendMessage(node->link,buf,totlen);
2591 }
2592 
2593 /* Vote for the node asking for our vote if there are the conditions. */
2594 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2595  clusterNode *master = node->slaveof;
2596  uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2597  uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2598  unsigned char *claimed_slots = request->myslots;
2599  int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2600  int j;
2601 
2602  /* IF we are not a master serving at least 1 slot, we don't have the
2603  * right to vote, as the cluster size in Redis Cluster is the number
2604  * of masters serving at least one slot, and quorum is the cluster
2605  * size + 1 */
2606  if (nodeIsSlave(myself) || myself->numslots == 0) return;
2607 
2608  /* Request epoch must be >= our currentEpoch.
2609  * Note that it is impossible for it to actually be greater since
2610  * our currentEpoch was updated as a side effect of receiving this
2611  * request, if the request epoch was greater. */
2612  if (requestCurrentEpoch < server.cluster->currentEpoch) {
2613  serverLog(LL_WARNING,
2614  "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2615  node->name,
2616  (unsigned long long) requestCurrentEpoch,
2617  (unsigned long long) server.cluster->currentEpoch);
2618  return;
2619  }
2620 
2621  /* I already voted for this epoch? Return ASAP. */
2622  if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2623  serverLog(LL_WARNING,
2624  "Failover auth denied to %.40s: already voted for epoch %llu",
2625  node->name,
2626  (unsigned long long) server.cluster->currentEpoch);
2627  return;
2628  }
2629 
2630  /* Node must be a slave and its master down.
2631  * The master can be non failing if the request is flagged
2632  * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2633  if (nodeIsMaster(node) || master == NULL ||
2634  (!nodeFailed(master) && !force_ack))
2635  {
2636  if (nodeIsMaster(node)) {
2637  serverLog(LL_WARNING,
2638  "Failover auth denied to %.40s: it is a master node",
2639  node->name);
2640  } else if (master == NULL) {
2641  serverLog(LL_WARNING,
2642  "Failover auth denied to %.40s: I don't know its master",
2643  node->name);
2644  } else if (!nodeFailed(master)) {
2645  serverLog(LL_WARNING,
2646  "Failover auth denied to %.40s: its master is up",
2647  node->name);
2648  }
2649  return;
2650  }
2651 
2652  /* We did not voted for a slave about this master for two
2653  * times the node timeout. This is not strictly needed for correctness
2654  * of the algorithm but makes the base case more linear. */
2655  if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2656  {
2657  serverLog(LL_WARNING,
2658  "Failover auth denied to %.40s: "
2659  "can't vote about this master before %lld milliseconds",
2660  node->name,
2661  (long long) ((server.cluster_node_timeout*2)-
2662  (mstime() - node->slaveof->voted_time)));
2663  return;
2664  }
2665 
2666  /* The slave requesting the vote must have a configEpoch for the claimed
2667  * slots that is >= the one of the masters currently serving the same
2668  * slots in the current configuration. */
2669  for (j = 0; j < CLUSTER_SLOTS; j++) {
2670  if (bitmapTestBit(claimed_slots, j) == 0) continue;
2671  if (server.cluster->slots[j] == NULL ||
2672  server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2673  {
2674  continue;
2675  }
2676  /* If we reached this point we found a slot that in our current slots
2677  * is served by a master with a greater configEpoch than the one claimed
2678  * by the slave requesting our vote. Refuse to vote for this slave. */
2679  serverLog(LL_WARNING,
2680  "Failover auth denied to %.40s: "
2681  "slot %d epoch (%llu) > reqEpoch (%llu)",
2682  node->name, j,
2683  (unsigned long long) server.cluster->slots[j]->configEpoch,
2684  (unsigned long long) requestConfigEpoch);
2685  return;
2686  }
2687 
2688  /* We can vote for this slave. */
2690  server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2691  node->slaveof->voted_time = mstime();
2692  serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2693  node->name, (unsigned long long) server.cluster->currentEpoch);
2694 }
2695 
2696 /* This function returns the "rank" of this instance, a slave, in the context
2697  * of its master-slaves ring. The rank of the slave is given by the number of
2698  * other slaves for the same master that have a better replication offset
2699  * compared to the local one (better means, greater, so they claim more data).
2700  *
2701  * A slave with rank 0 is the one with the greatest (most up to date)
2702  * replication offset, and so forth. Note that because how the rank is computed
2703  * multiple slaves may have the same rank, in case they have the same offset.
2704  *
2705  * The slave rank is used to add a delay to start an election in order to
2706  * get voted and replace a failing master. Slaves with better replication
2707  * offsets are more likely to win. */
2709  long long myoffset;
2710  int j, rank = 0;
2711  clusterNode *master;
2712 
2713  serverAssert(nodeIsSlave(myself));
2714  master = myself->slaveof;
2715  if (master == NULL) return 0; /* Never called by slaves without master. */
2716 
2717  myoffset = replicationGetSlaveOffset();
2718  for (j = 0; j < master->numslaves; j++)
2719  if (master->slaves[j] != myself &&
2720  master->slaves[j]->repl_offset > myoffset) rank++;
2721  return rank;
2722 }
2723 
2724 /* This function is called by clusterHandleSlaveFailover() in order to
2725  * let the slave log why it is not able to failover. Sometimes there are
2726  * not the conditions, but since the failover function is called again and
2727  * again, we can't log the same things continuously.
2728  *
2729  * This function works by logging only if a given set of conditions are
2730  * true:
2731  *
2732  * 1) The reason for which the failover can't be initiated changed.
2733  * The reasons also include a NONE reason we reset the state to
2734  * when the slave finds that its master is fine (no FAIL flag).
2735  * 2) Also, the log is emitted again if the master is still down and
2736  * the reason for not failing over is still the same, but more than
2737  * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2738  * 3) Finally, the function only logs if the slave is down for more than
2739  * five seconds + NODE_TIMEOUT. This way nothing is logged when a
2740  * failover starts in a reasonable time.
2741  *
2742  * The function is called with the reason why the slave can't failover
2743  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2744  *
2745  * The function is guaranteed to be called only if 'myself' is a slave. */
2746 void clusterLogCantFailover(int reason) {
2747  char *msg;
2748  static time_t lastlog_time = 0;
2749  mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2750 
2751  /* Don't log if we have the same reason for some time. */
2752  if (reason == server.cluster->cant_failover_reason &&
2753  time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2754  return;
2755 
2756  server.cluster->cant_failover_reason = reason;
2757 
2758  /* We also don't emit any log if the master failed no long ago, the
2759  * goal of this function is to log slaves in a stalled condition for
2760  * a long time. */
2761  if (myself->slaveof &&
2762  nodeFailed(myself->slaveof) &&
2763  (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2764 
2765  switch(reason) {
2766  case CLUSTER_CANT_FAILOVER_DATA_AGE:
2767  msg = "Disconnected from master for longer than allowed. "
2768  "Please check the 'cluster-slave-validity-factor' configuration "
2769  "option.";
2770  break;
2771  case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2772  msg = "Waiting the delay before I can start a new failover.";
2773  break;
2774  case CLUSTER_CANT_FAILOVER_EXPIRED:
2775  msg = "Failover attempt expired.";
2776  break;
2777  case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2778  msg = "Waiting for votes, but majority still not reached.";
2779  break;
2780  default:
2781  msg = "Unknown reason code.";
2782  break;
2783  }
2784  lastlog_time = time(NULL);
2785  serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2786 }
2787 
2788 /* This function implements the final part of automatic and manual failovers,
2789  * where the slave grabs its master's hash slots, and propagates the new
2790  * configuration.
2791  *
2792  * Note that it's up to the caller to be sure that the node got a new
2793  * configuration epoch already. */
2795  int j;
2796  clusterNode *oldmaster = myself->slaveof;
2797 
2798  if (nodeIsMaster(myself) || oldmaster == NULL) return;
2799 
2800  /* 1) Turn this node into a master. */
2802  replicationUnsetMaster();
2803 
2804  /* 2) Claim all the slots assigned to our master. */
2805  for (j = 0; j < CLUSTER_SLOTS; j++) {
2806  if (clusterNodeGetSlotBit(oldmaster,j)) {
2807  clusterDelSlot(j);
2809  }
2810  }
2811 
2812  /* 3) Update state and save config. */
2815 
2816  /* 4) Pong all the other nodes so that they can update the state
2817  * accordingly and detect that we switched to master role. */
2819 
2820  /* 5) If there was a manual failover in progress, clear the state. */
2822 }
2823 
2824 /* This function is called if we are a slave node and our master serving
2825  * a non-zero amount of hash slots is in FAIL state.
2826  *
2827  * The gaol of this function is:
2828  * 1) To check if we are able to perform a failover, is our data updated?
2829  * 2) Try to get elected by masters.
2830  * 3) Perform the failover informing all the other nodes.
2831  */
2833  mstime_t data_age;
2834  mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2835  int needed_quorum = (server.cluster->size / 2) + 1;
2836  int manual_failover = server.cluster->mf_end != 0 &&
2837  server.cluster->mf_can_start;
2838  mstime_t auth_timeout, auth_retry_time;
2839 
2840  server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2841 
2842  /* Compute the failover timeout (the max time we have to send votes
2843  * and wait for replies), and the failover retry time (the time to wait
2844  * before trying to get voted again).
2845  *
2846  * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
2847  * Retry is two times the Timeout.
2848  */
2849  auth_timeout = server.cluster_node_timeout*2;
2850  if (auth_timeout < 2000) auth_timeout = 2000;
2851  auth_retry_time = auth_timeout*2;
2852 
2853  /* Pre conditions to run the function, that must be met both in case
2854  * of an automatic or manual failover:
2855  * 1) We are a slave.
2856  * 2) Our master is flagged as FAIL, or this is a manual failover.
2857  * 3) It is serving slots. */
2858  if (nodeIsMaster(myself) ||
2859  myself->slaveof == NULL ||
2860  (!nodeFailed(myself->slaveof) && !manual_failover) ||
2861  myself->slaveof->numslots == 0)
2862  {
2863  /* There are no reasons to failover, so we set the reason why we
2864  * are returning without failing over to NONE. */
2865  server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
2866  return;
2867  }
2868 
2869  /* Set data_age to the number of seconds we are disconnected from
2870  * the master. */
2871  if (server.repl_state == REPL_STATE_CONNECTED) {
2872  data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
2873  * 1000;
2874  } else {
2875  data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
2876  }
2877 
2878  /* Remove the node timeout from the data age as it is fine that we are
2879  * disconnected from our master at least for the time it was down to be
2880  * flagged as FAIL, that's the baseline. */
2881  if (data_age > server.cluster_node_timeout)
2882  data_age -= server.cluster_node_timeout;
2883 
2884  /* Check if our data is recent enough according to the slave validity
2885  * factor configured by the user.
2886  *
2887  * Check bypassed for manual failovers. */
2888  if (server.cluster_slave_validity_factor &&
2889  data_age >
2890  (((mstime_t)server.repl_ping_slave_period * 1000) +
2891  (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
2892  {
2893  if (!manual_failover) {
2894  clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
2895  return;
2896  }
2897  }
2898 
2899  /* If the previous failover attempt timedout and the retry time has
2900  * elapsed, we can setup a new one. */
2901  if (auth_age > auth_retry_time) {
2902  server.cluster->failover_auth_time = mstime() +
2903  500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
2904  random() % 500; /* Random delay between 0 and 500 milliseconds. */
2905  server.cluster->failover_auth_count = 0;
2906  server.cluster->failover_auth_sent = 0;
2907  server.cluster->failover_auth_rank = clusterGetSlaveRank();
2908  /* We add another delay that is proportional to the slave rank.
2909  * Specifically 1 second * rank. This way slaves that have a probably
2910  * less updated replication offset, are penalized. */
2911  server.cluster->failover_auth_time +=
2912  server.cluster->failover_auth_rank * 1000;
2913  /* However if this is a manual failover, no delay is needed. */
2914  if (server.cluster->mf_end) {
2915  server.cluster->failover_auth_time = mstime();
2916  server.cluster->failover_auth_rank = 0;
2917  }
2918  serverLog(LL_WARNING,
2919  "Start of election delayed for %lld milliseconds "
2920  "(rank #%d, offset %lld).",
2921  server.cluster->failover_auth_time - mstime(),
2922  server.cluster->failover_auth_rank,
2923  replicationGetSlaveOffset());
2924  /* Now that we have a scheduled election, broadcast our offset
2925  * to all the other slaves so that they'll updated their offsets
2926  * if our offset is better. */
2928  return;
2929  }
2930 
2931  /* It is possible that we received more updated offsets from other
2932  * slaves for the same master since we computed our election delay.
2933  * Update the delay if our rank changed.
2934  *
2935  * Not performed if this is a manual failover. */
2936  if (server.cluster->failover_auth_sent == 0 &&
2937  server.cluster->mf_end == 0)
2938  {
2939  int newrank = clusterGetSlaveRank();
2940  if (newrank > server.cluster->failover_auth_rank) {
2941  long long added_delay =
2942  (newrank - server.cluster->failover_auth_rank) * 1000;
2943  server.cluster->failover_auth_time += added_delay;
2944  server.cluster->failover_auth_rank = newrank;
2945  serverLog(LL_WARNING,
2946  "Slave rank updated to #%d, added %lld milliseconds of delay.",
2947  newrank, added_delay);
2948  }
2949  }
2950 
2951  /* Return ASAP if we can't still start the election. */
2952  if (mstime() < server.cluster->failover_auth_time) {
2953  clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
2954  return;
2955  }
2956 
2957  /* Return ASAP if the election is too old to be valid. */
2958  if (auth_age > auth_timeout) {
2959  clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
2960  return;
2961  }
2962 
2963  /* Ask for votes if needed. */
2964  if (server.cluster->failover_auth_sent == 0) {
2965  server.cluster->currentEpoch++;
2966  server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
2967  serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
2968  (unsigned long long) server.cluster->currentEpoch);
2970  server.cluster->failover_auth_sent = 1;
2971  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2972  CLUSTER_TODO_UPDATE_STATE|
2973  CLUSTER_TODO_FSYNC_CONFIG);
2974  return; /* Wait for replies. */
2975  }
2976 
2977  /* Check if we reached the quorum. */
2978  if (server.cluster->failover_auth_count >= needed_quorum) {
2979  /* We have the quorum, we can finally failover the master. */
2980 
2981  serverLog(LL_WARNING,
2982  "Failover election won: I'm the new master.");
2983 
2984  /* Update my configEpoch to the epoch of the election. */
2985  if (myself->configEpoch < server.cluster->failover_auth_epoch) {
2986  myself->configEpoch = server.cluster->failover_auth_epoch;
2987  serverLog(LL_WARNING,
2988  "configEpoch set to %llu after successful failover",
2989  (unsigned long long) myself->configEpoch);
2990  }
2991 
2992  /* Take responsability for the cluster slots. */
2994  } else {
2995  clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
2996  }
2997 }
2998 
2999 /* -----------------------------------------------------------------------------
3000  * CLUSTER slave migration
3001  *
3002  * Slave migration is the process that allows a slave of a master that is
3003  * already covered by at least another slave, to "migrate" to a master that
3004  * is orpaned, that is, left with no working slaves.
3005  * ------------------------------------------------------------------------- */
3006 
3007 /* This function is responsible to decide if this replica should be migrated
3008  * to a different (orphaned) master. It is called by the clusterCron() function
3009  * only if:
3010  *
3011  * 1) We are a slave node.
3012  * 2) It was detected that there is at least one orphaned master in
3013  * the cluster.
3014  * 3) We are a slave of one of the masters with the greatest number of
3015  * slaves.
3016  *
3017  * This checks are performed by the caller since it requires to iterate
3018  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3019  * if definitely needed.
3020  *
3021  * The fuction is called with a pre-computed max_slaves, that is the max
3022  * number of working (not in FAIL state) slaves for a single master.
3023  *
3024  * Additional conditions for migration are examined inside the function.
3025  */
3026 void clusterHandleSlaveMigration(int max_slaves) {
3027  int j, okslaves = 0;
3028  clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3029  dictIterator *di;
3030  dictEntry *de;
3031 
3032  /* Step 1: Don't migrate if the cluster state is not ok. */
3033  if (server.cluster->state != CLUSTER_OK) return;
3034 
3035  /* Step 2: Don't migrate if my master will not be left with at least
3036  * 'migration-barrier' slaves after my migration. */
3037  if (mymaster == NULL) return;
3038  for (j = 0; j < mymaster->numslaves; j++)
3039  if (!nodeFailed(mymaster->slaves[j]) &&
3040  !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3041  if (okslaves <= server.cluster_migration_barrier) return;
3042 
3043  /* Step 3: Idenitfy a candidate for migration, and check if among the
3044  * masters with the greatest number of ok slaves, I'm the one with the
3045  * smallest node ID (the "candidate slave").
3046  *
3047  * Note: this means that eventually a replica migration will occurr
3048  * since slaves that are reachable again always have their FAIL flag
3049  * cleared, so eventually there must be a candidate. At the same time
3050  * this does not mean that there are no race conditions possible (two
3051  * slaves migrating at the same time), but this is unlikely to
3052  * happen, and harmless when happens. */
3053  candidate = myself;
3054  di = dictGetSafeIterator(server.cluster->nodes);
3055  while((de = dictNext(di)) != NULL) {
3056  clusterNode *node = dictGetVal(de);
3057  int okslaves = 0, is_orphaned = 1;
3058 
3059  /* We want to migrate only if this master is working, orphaned, and
3060  * used to have slaves or if failed over a master that had slaves
3061  * (MIGRATE_TO flag). This way we only migrate to instances that were
3062  * supposed to have replicas. */
3063  if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3064  if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3065 
3066  /* Check number of working slaves. */
3067  if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3068  if (okslaves > 0) is_orphaned = 0;
3069 
3070  if (is_orphaned) {
3071  if (!target && node->numslots > 0) target = node;
3072 
3073  /* Track the starting time of the orphaned condition for this
3074  * master. */
3075  if (!node->orphaned_time) node->orphaned_time = mstime();
3076  } else {
3077  node->orphaned_time = 0;
3078  }
3079 
3080  /* Check if I'm the slave candidate for the migration: attached
3081  * to a master with the maximum number of slaves and with the smallest
3082  * node ID. */
3083  if (okslaves == max_slaves) {
3084  for (j = 0; j < node->numslaves; j++) {
3085  if (memcmp(node->slaves[j]->name,
3086  candidate->name,
3087  CLUSTER_NAMELEN) < 0)
3088  {
3089  candidate = node->slaves[j];
3090  }
3091  }
3092  }
3093  }
3094  dictReleaseIterator(di);
3095 
3096  /* Step 4: perform the migration if there is a target, and if I'm the
3097  * candidate, but only if the master is continuously orphaned for a
3098  * couple of seconds, so that during failovers, we give some time to
3099  * the natural slaves of this instance to advertise their switch from
3100  * the old master to the new one. */
3101  if (target && candidate == myself &&
3102  (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY)
3103  {
3104  serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3105  target->name);
3106  clusterSetMaster(target);
3107  }
3108 }
3109 
3110 /* -----------------------------------------------------------------------------
3111  * CLUSTER manual failover
3112  *
3113  * This are the important steps performed by slaves during a manual failover:
3114  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3115  * setting mf_end to the millisecond unix time at which we'll abort the
3116  * attempt.
3117  * 2) Slave sends a MFSTART message to the master requesting to pause clients
3118  * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3119  * When master is paused for manual failover, it also starts to flag
3120  * packets with CLUSTERMSG_FLAG0_PAUSED.
3121  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3122  * 4) If slave received the offset from the master, and its offset matches,
3123  * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3124  * the failover as usually, with the difference that the vote request
3125  * will be modified to force masters to vote for a slave that has a
3126  * working master.
3127  *
3128  * From the point of view of the master things are simpler: when a
3129  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3130  * the sender in mf_slave. During the time limit for the manual failover
3131  * the master will just send PINGs more often to this slave, flagged with
3132  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3133  * a packet from the master with this flag set.
3134  *
3135  * The gaol of the manual failover is to perform a fast failover without
3136  * data loss due to the asynchronous master-slave replication.
3137  * -------------------------------------------------------------------------- */
3138 
3139 /* Reset the manual failover state. This works for both masters and slavesa
3140  * as all the state about manual failover is cleared.
3141  *
3142  * The function can be used both to initialize the manual failover state at
3143  * startup or to abort a manual failover in progress. */
3145  if (server.cluster->mf_end && clientsArePaused()) {
3146  server.clients_pause_end_time = 0;
3147  clientsArePaused(); /* Just use the side effect of the function. */
3148  }
3149  server.cluster->mf_end = 0; /* No manual failover in progress. */
3150  server.cluster->mf_can_start = 0;
3151  server.cluster->mf_slave = NULL;
3152  server.cluster->mf_master_offset = 0;
3153 }
3154 
3155 /* If a manual failover timed out, abort it. */
3157  if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3158  serverLog(LL_WARNING,"Manual failover timed out.");
3160  }
3161 }
3162 
3163 /* This function is called from the cluster cron function in order to go
3164  * forward with a manual failover state machine. */
3166  /* Return ASAP if no manual failover is in progress. */
3167  if (server.cluster->mf_end == 0) return;
3168 
3169  /* If mf_can_start is non-zero, the failover was already triggered so the
3170  * next steps are performed by clusterHandleSlaveFailover(). */
3171  if (server.cluster->mf_can_start) return;
3172 
3173  if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3174 
3175  if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3176  /* Our replication offset matches the master replication offset
3177  * announced after clients were paused. We can start the failover. */
3178  server.cluster->mf_can_start = 1;
3179  serverLog(LL_WARNING,
3180  "All master replication stream processed, "
3181  "manual failover can start.");
3182  }
3183 }
3184 
3185 /* -----------------------------------------------------------------------------
3186  * CLUSTER cron job
3187  * -------------------------------------------------------------------------- */
3188 
3189 /* This is executed 10 times every second */
3190 void clusterCron(void) {
3191  dictIterator *di;
3192  dictEntry *de;
3193  int update_state = 0;
3194  int orphaned_masters; /* How many masters there are without ok slaves. */
3195  int max_slaves; /* Max number of ok slaves for a single master. */
3196  int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3197  mstime_t min_pong = 0, now = mstime();
3198  clusterNode *min_pong_node = NULL;
3199  static unsigned long long iteration = 0;
3200  mstime_t handshake_timeout;
3201 
3202  iteration++; /* Number of times this function was called so far. */
3203 
3204  /* We want to take myself->ip in sync with the cluster-announce-ip option.
3205  * The option can be set at runtime via CONFIG SET, so we periodically check
3206  * if the option changed to reflect this into myself->ip. */
3207  {
3208  static char *prev_ip = NULL;
3209  char *curr_ip = server.cluster_announce_ip;
3210  int changed = 0;
3211 
3212  if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3213  if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3214  if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3215 
3216  if (changed) {
3217  prev_ip = curr_ip;
3218  if (prev_ip) prev_ip = zstrdup(prev_ip);
3219 
3220  if (curr_ip) {
3221  strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3222  myself->ip[NET_IP_STR_LEN-1] = '\0';
3223  } else {
3224  myself->ip[0] = '\0'; /* Force autodetection. */
3225  }
3226  }
3227  }
3228 
3229  /* The handshake timeout is the time after which a handshake node that was
3230  * not turned into a normal node is removed from the nodes. Usually it is
3231  * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3232  * the value of 1 second. */
3233  handshake_timeout = server.cluster_node_timeout;
3234  if (handshake_timeout < 1000) handshake_timeout = 1000;
3235 
3236  /* Check if we have disconnected nodes and re-establish the connection.
3237  * Also update a few stats while we are here, that can be used to make
3238  * better decisions in other part of the code. */
3239  di = dictGetSafeIterator(server.cluster->nodes);
3240  server.cluster->stats_pfail_nodes = 0;
3241  while((de = dictNext(di)) != NULL) {
3242  clusterNode *node = dictGetVal(de);
3243 
3244  /* Not interested in reconnecting the link with myself or nodes
3245  * for which we have no address. */
3246  if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3247 
3248  if (node->flags & CLUSTER_NODE_PFAIL)
3249  server.cluster->stats_pfail_nodes++;
3250 
3251  /* A Node in HANDSHAKE state has a limited lifespan equal to the
3252  * configured node timeout. */
3253  if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3254  clusterDelNode(node);
3255  continue;
3256  }
3257 
3258  if (node->link == NULL) {
3259  int fd;
3260  mstime_t old_ping_sent;
3261  clusterLink *link;
3262 
3263  fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3264  node->cport, NET_FIRST_BIND_ADDR);
3265  if (fd == -1) {
3266  /* We got a synchronous error from connect before
3267  * clusterSendPing() had a chance to be called.
3268  * If node->ping_sent is zero, failure detection can't work,
3269  * so we claim we actually sent a ping now (that will
3270  * be really sent as soon as the link is obtained). */
3271  if (node->ping_sent == 0) node->ping_sent = mstime();
3272  serverLog(LL_DEBUG, "Unable to connect to "
3273  "Cluster Node [%s]:%d -> %s", node->ip,
3274  node->cport, server.neterr);
3275  continue;
3276  }
3277  link = createClusterLink(node);
3278  link->fd = fd;
3279  node->link = link;
3280  aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3282  /* Queue a PING in the new connection ASAP: this is crucial
3283  * to avoid false positives in failure detection.
3284  *
3285  * If the node is flagged as MEET, we send a MEET message instead
3286  * of a PING one, to force the receiver to add us in its node
3287  * table. */
3288  old_ping_sent = node->ping_sent;
3289  clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3290  CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3291  if (old_ping_sent) {
3292  /* If there was an active ping before the link was
3293  * disconnected, we want to restore the ping time, otherwise
3294  * replaced by the clusterSendPing() call. */
3295  node->ping_sent = old_ping_sent;
3296  }
3297  /* We can clear the flag after the first packet is sent.
3298  * If we'll never receive a PONG, we'll never send new packets
3299  * to this node. Instead after the PONG is received and we
3300  * are no longer in meet/handshake status, we want to send
3301  * normal PING packets. */
3302  node->flags &= ~CLUSTER_NODE_MEET;
3303 
3304  serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3305  node->name, node->ip, node->cport);
3306  }
3307  }
3308  dictReleaseIterator(di);
3309 
3310  /* Ping some random node 1 time every 10 iterations, so that we usually ping
3311  * one random node every second. */
3312  if (!(iteration % 10)) {
3313  int j;
3314 
3315  /* Check a few random nodes and ping the one with the oldest
3316  * pong_received time. */
3317  for (j = 0; j < 5; j++) {
3318  de = dictGetRandomKey(server.cluster->nodes);
3319  clusterNode *this = dictGetVal(de);
3320 
3321  /* Don't ping nodes disconnected or with a ping currently active. */
3322  if (this->link == NULL || this->ping_sent != 0) continue;
3323  if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3324  continue;
3325  if (min_pong_node == NULL || min_pong > this->pong_received) {
3326  min_pong_node = this;
3327  min_pong = this->pong_received;
3328  }
3329  }
3330  if (min_pong_node) {
3331  serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3332  clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3333  }
3334  }
3335 
3336  /* Iterate nodes to check if we need to flag something as failing.
3337  * This loop is also responsible to:
3338  * 1) Check if there are orphaned masters (masters without non failing
3339  * slaves).
3340  * 2) Count the max number of non failing slaves for a single master.
3341  * 3) Count the number of slaves for our master, if we are a slave. */
3342  orphaned_masters = 0;
3343  max_slaves = 0;
3344  this_slaves = 0;
3345  di = dictGetSafeIterator(server.cluster->nodes);
3346  while((de = dictNext(di)) != NULL) {
3347  clusterNode *node = dictGetVal(de);
3348  now = mstime(); /* Use an updated time at every iteration. */
3349  mstime_t delay;
3350 
3351  if (node->flags &
3352  (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3353  continue;
3354 
3355  /* Orphaned master check, useful only if the current instance
3356  * is a slave that may migrate to another master. */
3357  if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3358  int okslaves = clusterCountNonFailingSlaves(node);
3359 
3360  /* A master is orphaned if it is serving a non-zero number of
3361  * slots, have no working slaves, but used to have at least one
3362  * slave, or failed over a master that used to have slaves. */
3363  if (okslaves == 0 && node->numslots > 0 &&
3364  node->flags & CLUSTER_NODE_MIGRATE_TO)
3365  {
3366  orphaned_masters++;
3367  }
3368  if (okslaves > max_slaves) max_slaves = okslaves;
3369  if (nodeIsSlave(myself) && myself->slaveof == node)
3370  this_slaves = okslaves;
3371  }
3372 
3373  /* If we are waiting for the PONG more than half the cluster
3374  * timeout, reconnect the link: maybe there is a connection
3375  * issue even if the node is alive. */
3376  if (node->link && /* is connected */
3377  now - node->link->ctime >
3378  server.cluster_node_timeout && /* was not already reconnected */
3379  node->ping_sent && /* we already sent a ping */
3380  node->pong_received < node->ping_sent && /* still waiting pong */
3381  /* and we are waiting for the pong more than timeout/2 */
3382  now - node->ping_sent > server.cluster_node_timeout/2)
3383  {
3384  /* Disconnect the link, it will be reconnected automatically. */
3385  freeClusterLink(node->link);
3386  }
3387 
3388  /* If we have currently no active ping in this instance, and the
3389  * received PONG is older than half the cluster timeout, send
3390  * a new ping now, to ensure all the nodes are pinged without
3391  * a too big delay. */
3392  if (node->link &&
3393  node->ping_sent == 0 &&
3394  (now - node->pong_received) > server.cluster_node_timeout/2)
3395  {
3396  clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3397  continue;
3398  }
3399 
3400  /* If we are a master and one of the slaves requested a manual
3401  * failover, ping it continuously. */
3402  if (server.cluster->mf_end &&
3403  nodeIsMaster(myself) &&
3404  server.cluster->mf_slave == node &&
3405  node->link)
3406  {
3407  clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3408  continue;
3409  }
3410 
3411  /* Check only if we have an active ping for this instance. */
3412  if (node->ping_sent == 0) continue;
3413 
3414  /* Compute the delay of the PONG. Note that if we already received
3415  * the PONG, then node->ping_sent is zero, so can't reach this
3416  * code at all. */
3417  delay = now - node->ping_sent;
3418 
3419  if (delay > server.cluster_node_timeout) {
3420  /* Timeout reached. Set the node as possibly failing if it is
3421  * not already in this state. */
3422  if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3423  serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3424  node->name);
3425  node->flags |= CLUSTER_NODE_PFAIL;
3426  update_state = 1;
3427  }
3428  }
3429  }
3430  dictReleaseIterator(di);
3431 
3432  /* If we are a slave node but the replication is still turned off,
3433  * enable it if we know the address of our master and it appears to
3434  * be up. */
3435  if (nodeIsSlave(myself) &&
3436  server.masterhost == NULL &&
3437  myself->slaveof &&
3438  nodeHasAddr(myself->slaveof))
3439  {
3440  replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3441  }
3442 
3443  /* Abourt a manual failover if the timeout is reached. */
3445 
3446  if (nodeIsSlave(myself)) {
3449  /* If there are orphaned slaves, and we are a slave among the masters
3450  * with the max number of non-failing slaves, consider migrating to
3451  * the orphaned masters. Note that it does not make sense to try
3452  * a migration if there is no master with at least *two* working
3453  * slaves. */
3454  if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3455  clusterHandleSlaveMigration(max_slaves);
3456  }
3457 
3458  if (update_state || server.cluster->state == CLUSTER_FAIL)
3460 }
3461 
3462 /* This function is called before the event handler returns to sleep for
3463  * events. It is useful to perform operations that must be done ASAP in
3464  * reaction to events fired but that are not safe to perform inside event
3465  * handlers, or to perform potentially expansive tasks that we need to do
3466  * a single time before replying to clients. */
3468  /* Handle failover, this is needed when it is likely that there is already
3469  * the quorum from masters in order to react fast. */
3470  if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3472 
3473  /* Update the cluster state. */
3474  if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3476 
3477  /* Save the config, possibly using fsync. */
3478  if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3479  int fsync = server.cluster->todo_before_sleep &
3480  CLUSTER_TODO_FSYNC_CONFIG;
3482  }
3483 
3484  /* Reset our flags (not strictly needed since every single function
3485  * called for flags set should be able to clear its flag). */
3486  server.cluster->todo_before_sleep = 0;
3487 }
3488 
3490  server.cluster->todo_before_sleep |= flags;
3491 }
3492 
3493 /* -----------------------------------------------------------------------------
3494  * Slots management
3495  * -------------------------------------------------------------------------- */
3496 
3497 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3498  * otherwise 0. */
3499 int bitmapTestBit(unsigned char *bitmap, int pos) {
3500  off_t byte = pos/8;
3501  int bit = pos&7;
3502  return (bitmap[byte] & (1<<bit)) != 0;
3503 }
3504 
3505 /* Set the bit at position 'pos' in a bitmap. */
3506 void bitmapSetBit(unsigned char *bitmap, int pos) {
3507  off_t byte = pos/8;
3508  int bit = pos&7;
3509  bitmap[byte] |= 1<<bit;
3510 }
3511 
3512 /* Clear the bit at position 'pos' in a bitmap. */
3513 void bitmapClearBit(unsigned char *bitmap, int pos) {
3514  off_t byte = pos/8;
3515  int bit = pos&7;
3516  bitmap[byte] &= ~(1<<bit);
3517 }
3518 
3519 /* Return non-zero if there is at least one master with slaves in the cluster.
3520  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3521  * MIGRATE_TO flag the when a master gets the first slot. */
3523  dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3524  dictEntry *de;
3525  int slaves = 0;
3526  while((de = dictNext(di)) != NULL) {
3527  clusterNode *node = dictGetVal(de);
3528 
3529  if (nodeIsSlave(node)) continue;
3530  slaves += node->numslaves;
3531  }
3532  dictReleaseIterator(di);
3533  return slaves != 0;
3534 }
3535 
3536 /* Set the slot bit and return the old value. */
3537 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3538  int old = bitmapTestBit(n->slots,slot);
3539  bitmapSetBit(n->slots,slot);
3540  if (!old) {
3541  n->numslots++;
3542  /* When a master gets its first slot, even if it has no slaves,
3543  * it gets flagged with MIGRATE_TO, that is, the master is a valid
3544  * target for replicas migration, if and only if at least one of
3545  * the other masters has slaves right now.
3546  *
3547  * Normally masters are valid targerts of replica migration if:
3548  * 1. The used to have slaves (but no longer have).
3549  * 2. They are slaves failing over a master that used to have slaves.
3550  *
3551  * However new masters with slots assigned are considered valid
3552  * migration tagets if the rest of the cluster is not a slave-less.
3553  *
3554  * See https://github.com/antirez/redis/issues/3043 for more info. */
3555  if (n->numslots == 1 && clusterMastersHaveSlaves())
3556  n->flags |= CLUSTER_NODE_MIGRATE_TO;
3557  }
3558  return old;
3559 }
3560 
3561 /* Clear the slot bit and return the old value. */
3562 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3563  int old = bitmapTestBit(n->slots,slot);
3564  bitmapClearBit(n->slots,slot);
3565  if (old) n->numslots--;
3566  return old;
3567 }
3568 
3569 /* Return the slot bit from the cluster node structure. */
3570 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3571  return bitmapTestBit(n->slots,slot);
3572 }
3573 
3574 /* Add the specified slot to the list of slots that node 'n' will
3575  * serve. Return C_OK if the operation ended with success.
3576  * If the slot is already assigned to another instance this is considered
3577  * an error and C_ERR is returned. */
3578 int clusterAddSlot(clusterNode *n, int slot) {
3579  if (server.cluster->slots[slot]) return C_ERR;
3580  clusterNodeSetSlotBit(n,slot);
3581  server.cluster->slots[slot] = n;
3582  return C_OK;
3583 }
3584 
3585 /* Delete the specified slot marking it as unassigned.
3586  * Returns C_OK if the slot was assigned, otherwise if the slot was
3587  * already unassigned C_ERR is returned. */
3588 int clusterDelSlot(int slot) {
3589  clusterNode *n = server.cluster->slots[slot];
3590 
3591  if (!n) return C_ERR;
3592  serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3593  server.cluster->slots[slot] = NULL;
3594  return C_OK;
3595 }
3596 
3597 /* Delete all the slots associated with the specified node.
3598  * The number of deleted slots is returned. */
3599 int clusterDelNodeSlots(clusterNode *node) {
3600  int deleted = 0, j;
3601 
3602  for (j = 0; j < CLUSTER_SLOTS; j++) {
3603  if (clusterNodeGetSlotBit(node,j)) clusterDelSlot(j);
3604  deleted++;
3605  }
3606  return deleted;
3607 }
3608 
3609 /* Clear the migrating / importing state for all the slots.
3610  * This is useful at initialization and when turning a master into slave. */
3612  memset(server.cluster->migrating_slots_to,0,
3613  sizeof(server.cluster->migrating_slots_to));
3614  memset(server.cluster->importing_slots_from,0,
3615  sizeof(server.cluster->importing_slots_from));
3616 }
3617 
3618 /* -----------------------------------------------------------------------------
3619  * Cluster state evaluation function
3620  * -------------------------------------------------------------------------- */
3621 
3622 /* The following are defines that are only used in the evaluation function
3623  * and are based on heuristics. Actaully the main point about the rejoin and
3624  * writable delay is that they should be a few orders of magnitude larger
3625  * than the network latency. */
3626 #define CLUSTER_MAX_REJOIN_DELAY 5000
3627 #define CLUSTER_MIN_REJOIN_DELAY 500
3628 #define CLUSTER_WRITABLE_DELAY 2000
3629 
3631  int j, new_state;
3632  int reachable_masters = 0;
3633  static mstime_t among_minority_time;
3634  static mstime_t first_call_time = 0;
3635 
3636  server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3637 
3638  /* If this is a master node, wait some time before turning the state
3639  * into OK, since it is not a good idea to rejoin the cluster as a writable
3640  * master, after a reboot, without giving the cluster a chance to
3641  * reconfigure this node. Note that the delay is calculated starting from
3642  * the first call to this function and not since the server start, in order
3643  * to don't count the DB loading time. */
3644  if (first_call_time == 0) first_call_time = mstime();
3645  if (nodeIsMaster(myself) &&
3646  server.cluster->state == CLUSTER_FAIL &&
3647  mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3648 
3649  /* Start assuming the state is OK. We'll turn it into FAIL if there
3650  * are the right conditions. */
3651  new_state = CLUSTER_OK;
3652 
3653  /* Check if all the slots are covered. */
3654  if (server.cluster_require_full_coverage) {
3655  for (j = 0; j < CLUSTER_SLOTS; j++) {
3656  if (server.cluster->slots[j] == NULL ||
3657  server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3658  {
3659  new_state = CLUSTER_FAIL;
3660  break;
3661  }
3662  }
3663  }
3664 
3665  /* Compute the cluster size, that is the number of master nodes
3666  * serving at least a single slot.
3667  *
3668  * At the same time count the number of reachable masters having
3669  * at least one slot. */
3670  {
3671  dictIterator *di;
3672  dictEntry *de;
3673 
3674  server.cluster->size = 0;
3675  di = dictGetSafeIterator(server.cluster->nodes);
3676  while((de = dictNext(di)) != NULL) {
3677  clusterNode *node = dictGetVal(de);
3678 
3679  if (nodeIsMaster(node) && node->numslots) {
3680  server.cluster->size++;
3681  if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3682  reachable_masters++;
3683  }
3684  }
3685  dictReleaseIterator(di);
3686  }
3687 
3688  /* If we are in a minority partition, change the cluster state
3689  * to FAIL. */
3690  {
3691  int needed_quorum = (server.cluster->size / 2) + 1;
3692 
3693  if (reachable_masters < needed_quorum) {
3694  new_state = CLUSTER_FAIL;
3695  among_minority_time = mstime();
3696  }
3697  }
3698 
3699  /* Log a state change */
3700  if (new_state != server.cluster->state) {
3701  mstime_t rejoin_delay = server.cluster_node_timeout;
3702 
3703  /* If the instance is a master and was partitioned away with the
3704  * minority, don't let it accept queries for some time after the
3705  * partition heals, to make sure there is enough time to receive
3706  * a configuration update. */
3707  if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3708  rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3709  if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3710  rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3711 
3712  if (new_state == CLUSTER_OK &&
3713  nodeIsMaster(myself) &&
3714  mstime() - among_minority_time < rejoin_delay)
3715  {
3716  return;
3717  }
3718 
3719  /* Change the state and log the event. */
3720  serverLog(LL_WARNING,"Cluster state changed: %s",
3721  new_state == CLUSTER_OK ? "ok" : "fail");
3722  server.cluster->state = new_state;
3723  }
3724 }
3725 
3726 /* This function is called after the node startup in order to verify that data
3727  * loaded from disk is in agreement with the cluster configuration:
3728  *
3729  * 1) If we find keys about hash slots we have no responsibility for, the
3730  * following happens:
3731  * A) If no other node is in charge according to the current cluster
3732  * configuration, we add these slots to our node.
3733  * B) If according to our config other nodes are already in charge for
3734  * this lots, we set the slots as IMPORTING from our point of view
3735  * in order to justify we have those slots, and in order to make
3736  * redis-trib aware of the issue, so that it can try to fix it.
3737  * 2) If we find data in a DB different than DB0 we return C_ERR to
3738  * signal the caller it should quit the server with an error message
3739  * or take other actions.
3740  *
3741  * The function always returns C_OK even if it will try to correct
3742  * the error described in "1". However if data is found in DB different
3743  * from DB0, C_ERR is returned.
3744  *
3745  * The function also uses the logging facility in order to warn the user
3746  * about desynchronizations between the data we have in memory and the
3747  * cluster configuration. */
3749  int j;
3750  int update_config = 0;
3751 
3752  /* If this node is a slave, don't perform the check at all as we
3753  * completely depend on the replication stream. */
3754  if (nodeIsSlave(myself)) return C_OK;
3755 
3756  /* Make sure we only have keys in DB0. */
3757  for (j = 1; j < server.dbnum; j++) {
3758  if (dictSize(server.db[j].dict)) return C_ERR;
3759  }
3760 
3761  /* Check that all the slots we see populated memory have a corresponding
3762  * entry in the cluster table. Otherwise fix the table. */
3763  for (j = 0; j < CLUSTER_SLOTS; j++) {
3764  if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3765  /* Check if we are assigned to this slot or if we are importing it.
3766  * In both cases check the next slot as the configuration makes
3767  * sense. */
3768  if (server.cluster->slots[j] == myself ||
3769  server.cluster->importing_slots_from[j] != NULL) continue;
3770 
3771  /* If we are here data and cluster config don't agree, and we have
3772  * slot 'j' populated even if we are not importing it, nor we are
3773  * assigned to this slot. Fix this condition. */
3774 
3775  update_config++;
3776  /* Case A: slot is unassigned. Take responsibility for it. */
3777  if (server.cluster->slots[j] == NULL) {
3778  serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3779  "Taking responsibility for it.",j);
3781  } else {
3782  serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3783  "assigned to another node. "
3784  "Setting it to importing state.",j);
3785  server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3786  }
3787  }
3788  if (update_config) clusterSaveConfigOrDie(1);
3789  return C_OK;
3790 }
3791 
3792 /* -----------------------------------------------------------------------------
3793  * SLAVE nodes handling
3794  * -------------------------------------------------------------------------- */
3795 
3796 /* Set the specified node 'n' as master for this node.
3797  * If this node is currently a master, it is turned into a slave. */
3798 void clusterSetMaster(clusterNode *n) {
3799  serverAssert(n != myself);
3800  serverAssert(myself->numslots == 0);
3801 
3802  if (nodeIsMaster(myself)) {
3803  myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3804  myself->flags |= CLUSTER_NODE_SLAVE;
3806  } else {
3807  if (myself->slaveof)
3809  }
3810  myself->slaveof = n;
3812  replicationSetMaster(n->ip, n->port);
3814 }
3815 
3816 /* -----------------------------------------------------------------------------
3817  * Nodes to string representation functions.
3818  * -------------------------------------------------------------------------- */
3819 
3822  char *name;
3823 };
3824 
3825 static struct redisNodeFlags redisNodeFlagsTable[] = {
3826  {CLUSTER_NODE_MYSELF, "myself,"},
3827  {CLUSTER_NODE_MASTER, "master,"},
3828  {CLUSTER_NODE_SLAVE, "slave,"},
3829  {CLUSTER_NODE_PFAIL, "fail?,"},
3830  {CLUSTER_NODE_FAIL, "fail,"},
3831  {CLUSTER_NODE_HANDSHAKE, "handshake,"},
3832  {CLUSTER_NODE_NOADDR, "noaddr,"}
3833 };
3834 
3835 /* Concatenate the comma separated list of node flags to the given SDS
3836  * string 'ci'. */
3838  if (flags == 0) {
3839  ci = sdscat(ci,"noflags,");
3840  } else {
3841  int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
3842  for (i = 0; i < size; i++) {
3843  struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
3844  if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
3845  }
3846  }
3847  sdsIncrLen(ci,-1); /* Remove trailing comma. */
3848  return ci;
3849 }
3850 
3851 /* Generate a csv-alike representation of the specified cluster node.
3852  * See clusterGenNodesDescription() top comment for more information.
3853  *
3854  * The function returns the string representation as an SDS string. */
3855 sds clusterGenNodeDescription(clusterNode *node) {
3856  int j, start;
3857  sds ci;
3858 
3859  /* Node coordinates */
3860  ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
3861  node->name,
3862  node->ip,
3863  node->port,
3864  node->cport);
3865 
3866  /* Flags */
3867  ci = representClusterNodeFlags(ci, node->flags);
3868 
3869  /* Slave of... or just "-" */
3870  if (node->slaveof)
3871  ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
3872  else
3873  ci = sdscatlen(ci," - ",3);
3874 
3875  /* Latency from the POV of this node, config epoch, link status */
3876  ci = sdscatprintf(ci,"%lld %lld %llu %s",
3877  (long long) node->ping_sent,
3878  (long long) node->pong_received,
3879  (unsigned long long) node->configEpoch,
3880  (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
3881  "connected" : "disconnected");
3882 
3883  /* Slots served by this instance */
3884  start = -1;
3885  for (j = 0; j < CLUSTER_SLOTS; j++) {
3886  int bit;
3887 
3888  if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
3889  if (start == -1) start = j;
3890  }
3891  if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
3892  if (bit && j == CLUSTER_SLOTS-1) j++;
3893 
3894  if (start == j-1) {
3895  ci = sdscatprintf(ci," %d",start);
3896  } else {
3897  ci = sdscatprintf(ci," %d-%d",start,j-1);
3898  }
3899  start = -1;
3900  }
3901  }
3902 
3903  /* Just for MYSELF node we also dump info about slots that
3904  * we are migrating to other instances or importing from other
3905  * instances. */
3906  if (node->flags & CLUSTER_NODE_MYSELF) {
3907  for (j = 0; j < CLUSTER_SLOTS; j++) {
3908  if (server.cluster->migrating_slots_to[j]) {
3909  ci = sdscatprintf(ci," [%d->-%.40s]",j,
3910  server.cluster->migrating_slots_to[j]->name);
3911  } else if (server.cluster->importing_slots_from[j]) {
3912  ci = sdscatprintf(ci," [%d-<-%.40s]",j,
3913  server.cluster->importing_slots_from[j]->name);
3914  }
3915  }
3916  }
3917  return ci;
3918 }
3919 
3920 /* Generate a csv-alike representation of the nodes we are aware of,
3921  * including the "myself" node, and return an SDS string containing the
3922  * representation (it is up to the caller to free it).
3923  *
3924  * All the nodes matching at least one of the node flags specified in
3925  * "filter" are excluded from the output, so using zero as a filter will
3926  * include all the known nodes in the representation, including nodes in
3927  * the HANDSHAKE state.
3928  *
3929  * The representation obtained using this function is used for the output
3930  * of the CLUSTER NODES function, and as format for the cluster
3931  * configuration file (nodes.conf) for a given node. */
3933  sds ci = sdsempty(), ni;
3934  dictIterator *di;
3935  dictEntry *de;
3936 
3937  di = dictGetSafeIterator(server.cluster->nodes);
3938  while((de = dictNext(di)) != NULL) {
3939  clusterNode *node = dictGetVal(de);
3940 
3941  if (node->flags & filter) continue;
3942  ni = clusterGenNodeDescription(node);
3943  ci = sdscatsds(ci,ni);
3944  sdsfree(ni);
3945  ci = sdscatlen(ci,"\n",1);
3946  }
3947  dictReleaseIterator(di);
3948  return ci;
3949 }
3950 
3951 /* -----------------------------------------------------------------------------
3952  * CLUSTER command
3953  * -------------------------------------------------------------------------- */
3954 
3956  switch(type) {
3957  case CLUSTERMSG_TYPE_PING: return "ping";
3958  case CLUSTERMSG_TYPE_PONG: return "pong";
3959  case CLUSTERMSG_TYPE_MEET: return "meet";
3960  case CLUSTERMSG_TYPE_FAIL: return "fail";
3961  case CLUSTERMSG_TYPE_PUBLISH: return "publish";
3962  case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
3963  case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
3964  case CLUSTERMSG_TYPE_UPDATE: return "update";
3965  case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
3966  }
3967  return "unknown";
3968 }
3969 
3970 int getSlotOrReply(client *c, robj *o) {
3971  long long slot;
3972 
3973  if (getLongLongFromObject(o,&slot) != C_OK ||
3974  slot < 0 || slot >= CLUSTER_SLOTS)
3975  {
3976  addReplyError(c,"Invalid or out of range slot");
3977  return -1;
3978  }
3979  return (int) slot;
3980 }
3981 
3983  /* Format: 1) 1) start slot
3984  * 2) end slot
3985  * 3) 1) master IP
3986  * 2) master port
3987  * 3) node ID
3988  * 4) 1) replica IP
3989  * 2) replica port
3990  * 3) node ID
3991  * ... continued until done
3992  */
3993 
3994  int num_masters = 0;
3995  void *slot_replylen = addDeferredMultiBulkLength(c);
3996 
3997  dictEntry *de;
3998  dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3999  while((de = dictNext(di)) != NULL) {
4000  clusterNode *node = dictGetVal(de);
4001  int j = 0, start = -1;
4002 
4003  /* Skip slaves (that are iterated when producing the output of their
4004  * master) and masters not serving any slot. */
4005  if (!nodeIsMaster(node) || node->numslots == 0) continue;
4006 
4007  for (j = 0; j < CLUSTER_SLOTS; j++) {
4008  int bit, i;
4009 
4010  if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4011  if (start == -1) start = j;
4012  }
4013  if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4014  int nested_elements = 3; /* slots (2) + master addr (1). */
4015  void *nested_replylen = addDeferredMultiBulkLength(c);
4016 
4017  if (bit && j == CLUSTER_SLOTS-1) j++;
4018 
4019  /* If slot exists in output map, add to it's list.
4020  * else, create a new output map for this slot */
4021  if (start == j-1) {
4022  addReplyLongLong(c, start); /* only one slot; low==high */
4023  addReplyLongLong(c, start);
4024  } else {
4025  addReplyLongLong(c, start); /* low */
4026  addReplyLongLong(c, j-1); /* high */
4027  }
4028  start = -1;
4029 
4030  /* First node reply position is always the master */
4031  addReplyMultiBulkLen(c, 3);
4032  addReplyBulkCString(c, node->ip);
4033  addReplyLongLong(c, node->port);
4034  addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4035 
4036  /* Remaining nodes in reply are replicas for slot range */
4037  for (i = 0; i < node->numslaves; i++) {
4038  /* This loop is copy/pasted from clusterGenNodeDescription()
4039  * with modifications for per-slot node aggregation */
4040  if (nodeFailed(node->slaves[i])) continue;
4041  addReplyMultiBulkLen(c, 3);
4042  addReplyBulkCString(c, node->slaves[i]->ip);
4043  addReplyLongLong(c, node->slaves[i]->port);
4044  addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4045  nested_elements++;
4046  }
4047  setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
4048  num_masters++;
4049  }
4050  }
4051  }
4052  dictReleaseIterator(di);
4053  setDeferredMultiBulkLength(c, slot_replylen, num_masters);
4054 }
4055 
4056 void clusterCommand(client *c) {
4057  if (server.cluster_enabled == 0) {
4058  addReplyError(c,"This instance has cluster support disabled");
4059  return;
4060  }
4061 
4062  if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4063  /* CLUSTER MEET <ip> <port> [cport] */
4064  long long port, cport;
4065 
4066  if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4067  addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4068  (char*)c->argv[3]->ptr);
4069  return;
4070  }
4071 
4072  if (c->argc == 5) {
4073  if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4074  addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4075  (char*)c->argv[4]->ptr);
4076  return;
4077  }
4078  } else {
4079  cport = port + CLUSTER_PORT_INCR;
4080  }
4081 
4082  if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4083  errno == EINVAL)
4084  {
4085  addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4086  (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4087  } else {
4088  addReply(c,shared.ok);
4089  }
4090  } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4091  /* CLUSTER NODES */
4092  robj *o;
4093  sds ci = clusterGenNodesDescription(0);
4094 
4095  o = createObject(OBJ_STRING,ci);
4096  addReplyBulk(c,o);
4097  decrRefCount(o);
4098  } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4099  /* CLUSTER MYID */
4100  addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4101  } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4102  /* CLUSTER SLOTS */
4104  } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4105  /* CLUSTER FLUSHSLOTS */
4106  if (dictSize(server.db[0].dict) != 0) {
4107  addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4108  return;
4109  }
4111  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4112  addReply(c,shared.ok);
4113  } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4114  !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4115  {
4116  /* CLUSTER ADDSLOTS <slot> [slot] ... */
4117  /* CLUSTER DELSLOTS <slot> [slot] ... */
4118  int j, slot;
4119  unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4120  int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4121 
4122  memset(slots,0,CLUSTER_SLOTS);
4123  /* Check that all the arguments are parseable and that all the
4124  * slots are not already busy. */
4125  for (j = 2; j < c->argc; j++) {
4126  if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4127  zfree(slots);
4128  return;
4129  }
4130  if (del && server.cluster->slots[slot] == NULL) {
4131  addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4132  zfree(slots);
4133  return;
4134  } else if (!del && server.cluster->slots[slot]) {
4135  addReplyErrorFormat(c,"Slot %d is already busy", slot);
4136  zfree(slots);
4137  return;
4138  }
4139  if (slots[slot]++ == 1) {
4140  addReplyErrorFormat(c,"Slot %d specified multiple times",
4141  (int)slot);
4142  zfree(slots);
4143  return;
4144  }
4145  }
4146  for (j = 0; j < CLUSTER_SLOTS; j++) {
4147  if (slots[j]) {
4148  int retval;
4149 
4150  /* If this slot was set as importing we can clear this
4151  * state as now we are the real owner of the slot. */
4152  if (server.cluster->importing_slots_from[j])
4153  server.cluster->importing_slots_from[j] = NULL;
4154 
4155  retval = del ? clusterDelSlot(j) :
4157  serverAssertWithInfo(c,NULL,retval == C_OK);
4158  }
4159  }
4160  zfree(slots);
4161  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4162  addReply(c,shared.ok);
4163  } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4164  /* SETSLOT 10 MIGRATING <node ID> */
4165  /* SETSLOT 10 IMPORTING <node ID> */
4166  /* SETSLOT 10 STABLE */
4167  /* SETSLOT 10 NODE <node ID> */
4168  int slot;
4169  clusterNode *n;
4170 
4171  if (nodeIsSlave(myself)) {
4172  addReplyError(c,"Please use SETSLOT only with masters.");
4173  return;
4174  }
4175 
4176  if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4177 
4178  if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4179  if (server.cluster->slots[slot] != myself) {
4180  addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4181  return;
4182  }
4183  if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4184  addReplyErrorFormat(c,"I don't know about node %s",
4185  (char*)c->argv[4]->ptr);
4186  return;
4187  }
4188  server.cluster->migrating_slots_to[slot] = n;
4189  } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4190  if (server.cluster->slots[slot] == myself) {
4191  addReplyErrorFormat(c,
4192  "I'm already the owner of hash slot %u",slot);
4193  return;
4194  }
4195  if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4196  addReplyErrorFormat(c,"I don't know about node %s",
4197  (char*)c->argv[3]->ptr);
4198  return;
4199  }
4200  server.cluster->importing_slots_from[slot] = n;
4201  } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4202  /* CLUSTER SETSLOT <SLOT> STABLE */
4203  server.cluster->importing_slots_from[slot] = NULL;
4204  server.cluster->migrating_slots_to[slot] = NULL;
4205  } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4206  /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4207  clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4208 
4209  if (!n) {
4210  addReplyErrorFormat(c,"Unknown node %s",
4211  (char*)c->argv[4]->ptr);
4212  return;
4213  }
4214  /* If this hash slot was served by 'myself' before to switch
4215  * make sure there are no longer local keys for this hash slot. */
4216  if (server.cluster->slots[slot] == myself && n != myself) {
4217  if (countKeysInSlot(slot) != 0) {
4218  addReplyErrorFormat(c,
4219  "Can't assign hashslot %d to a different node "
4220  "while I still hold keys for this hash slot.", slot);
4221  return;
4222  }
4223  }
4224  /* If this slot is in migrating status but we have no keys
4225  * for it assigning the slot to another node will clear
4226  * the migratig status. */
4227  if (countKeysInSlot(slot) == 0 &&
4228  server.cluster->migrating_slots_to[slot])
4229  server.cluster->migrating_slots_to[slot] = NULL;
4230 
4231  /* If this node was importing this slot, assigning the slot to
4232  * itself also clears the importing status. */
4233  if (n == myself &&
4234  server.cluster->importing_slots_from[slot])
4235  {
4236  /* This slot was manually migrated, set this node configEpoch
4237  * to a new epoch so that the new version can be propagated
4238  * by the cluster.
4239  *
4240  * Note that if this ever results in a collision with another
4241  * node getting the same configEpoch, for example because a
4242  * failover happens at the same time we close the slot, the
4243  * configEpoch collision resolution will fix it assigning
4244  * a different epoch to each node. */
4246  serverLog(LL_WARNING,
4247  "configEpoch updated after importing slot %d", slot);
4248  }
4249  server.cluster->importing_slots_from[slot] = NULL;
4250  }
4251  clusterDelSlot(slot);
4252  clusterAddSlot(n,slot);
4253  } else {
4254  addReplyError(c,
4255  "Invalid CLUSTER SETSLOT action or number of arguments");
4256  return;
4257  }
4258  clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4259  addReply(c,shared.ok);
4260  } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4261  /* CLUSTER BUMPEPOCH */
4263  sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4264  (retval == C_OK) ? "BUMPED" : "STILL",
4265  (unsigned long long) myself->configEpoch);
4266  addReplySds(c,reply);
4267  } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4268  /* CLUSTER INFO */
4269  char *statestr[] = {"ok","fail","needhelp"};
4270  int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4271  uint64_t myepoch;
4272  int j;
4273 
4274  for (j = 0; j < CLUSTER_SLOTS; j++) {
4275  clusterNode *n = server.cluster->slots[j];
4276 
4277  if (n == NULL) continue;
4278  slots_assigned++;
4279  if (nodeFailed(n)) {
4280  slots_fail++;
4281  } else if (nodeTimedOut(n)) {
4282  slots_pfail++;
4283  } else {
4284  slots_ok++;
4285  }
4286  }
4287 
4288  myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4289  myself->slaveof->configEpoch : myself->configEpoch;
4290 
4291  sds info = sdscatprintf(sdsempty(),
4292  "cluster_state:%s\r\n"
4293  "cluster_slots_assigned:%d\r\n"
4294  "cluster_slots_ok:%d\r\n"
4295  "cluster_slots_pfail:%d\r\n"
4296  "cluster_slots_fail:%d\r\n"
4297  "cluster_known_nodes:%lu\r\n"
4298  "cluster_size:%d\r\n"
4299  "cluster_current_epoch:%llu\r\n"
4300  "cluster_my_epoch:%llu\r\n"
4301  , statestr[server.cluster->state],
4302  slots_assigned,
4303  slots_ok,
4304  slots_pfail,
4305  slots_fail,
4306  dictSize(server.cluster->nodes),
4307  server.cluster->size,
4308  (unsigned long long) server.cluster->currentEpoch,
4309  (unsigned long long) myepoch
4310  );
4311 
4312  /* Show stats about messages sent and received. */
4313  long long tot_msg_sent = 0;
4314  long long tot_msg_received = 0;
4315 
4316  for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4317  if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4318  tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4319  info = sdscatprintf(info,
4320  "cluster_stats_messages_%s_sent:%lld\r\n",
4322  server.cluster->stats_bus_messages_sent[i]);
4323  }
4324  info = sdscatprintf(info,
4325  "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4326 
4327  for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4328  if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4329  tot_msg_received += server.cluster->stats_bus_messages_received[i];
4330  info = sdscatprintf(info,
4331  "cluster_stats_messages_%s_received:%lld\r\n",
4333  server.cluster->stats_bus_messages_received[i]);
4334  }
4335  info = sdscatprintf(info,
4336  "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4337 
4338  /* Produce the reply protocol. */
4339  addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4340  (unsigned long)sdslen(info)));
4341  addReplySds(c,info);
4342  addReply(c,shared.crlf);
4343  } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4344  int retval = clusterSaveConfig(1);
4345 
4346  if (retval == 0)
4347  addReply(c,shared.ok);
4348  else
4349  addReplyErrorFormat(c,"error saving the cluster node config: %s",
4350  strerror(errno));
4351  } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4352  /* CLUSTER KEYSLOT <key> */
4353  sds key = c->argv[2]->ptr;
4354 
4355  addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4356  } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4357  /* CLUSTER COUNTKEYSINSLOT <slot> */
4358  long long slot;
4359 
4360  if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4361  return;
4362  if (slot < 0 || slot >= CLUSTER_SLOTS) {
4363  addReplyError(c,"Invalid slot");
4364  return;
4365  }
4366  addReplyLongLong(c,countKeysInSlot(slot));
4367  } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4368  /* CLUSTER GETKEYSINSLOT <slot> <count> */
4369  long long maxkeys, slot;
4370  unsigned int numkeys, j;
4371  robj **keys;
4372 
4373  if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4374  return;
4375  if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4376  != C_OK)
4377  return;
4378  if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4379  addReplyError(c,"Invalid slot or number of keys");
4380  return;
4381  }
4382 
4383  keys = zmalloc(sizeof(robj*)*maxkeys);
4384  numkeys = getKeysInSlot(slot, keys, maxkeys);
4385  addReplyMultiBulkLen(c,numkeys);
4386  for (j = 0; j < numkeys; j++) {
4387  addReplyBulk(c,keys[j]);
4388  decrRefCount(keys[j]);
4389  }
4390  zfree(keys);
4391  } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4392  /* CLUSTER FORGET <NODE ID> */
4393  clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4394 
4395  if (!n) {
4396  addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4397  return;
4398  } else if (n == myself) {
4399  addReplyError(c,"I tried hard but I can't forget myself...");
4400  return;
4401  } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4402  addReplyError(c,"Can't forget my master!");
4403  return;
4404  }
4406  clusterDelNode(n);
4407  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4408  CLUSTER_TODO_SAVE_CONFIG);
4409  addReply(c,shared.ok);
4410  } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4411  /* CLUSTER REPLICATE <NODE ID> */
4412  clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4413 
4414  /* Lookup the specified node in our table. */
4415  if (!n) {
4416  addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4417  return;
4418  }
4419 
4420  /* I can't replicate myself. */
4421  if (n == myself) {
4422  addReplyError(c,"Can't replicate myself");
4423  return;
4424  }
4425 
4426  /* Can't replicate a slave. */
4427  if (nodeIsSlave(n)) {
4428  addReplyError(c,"I can only replicate a master, not a slave.");
4429  return;
4430  }
4431 
4432  /* If the instance is currently a master, it should have no assigned
4433  * slots nor keys to accept to replicate some other node.
4434  * Slaves can switch to another master without issues. */
4435  if (nodeIsMaster(myself) &&
4436  (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4437  addReplyError(c,
4438  "To set a master the node must be empty and "
4439  "without assigned slots.");
4440  return;
4441  }
4442 
4443  /* Set the master. */
4445  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4446  addReply(c,shared.ok);
4447  } else if (!strcasecmp(c->argv[1]->ptr,"slaves") && c->argc == 3) {
4448  /* CLUSTER SLAVES <NODE ID> */
4449  clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4450  int j;
4451 
4452  /* Lookup the specified node in our table. */
4453  if (!n) {
4454  addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4455  return;
4456  }
4457 
4458  if (nodeIsSlave(n)) {
4459  addReplyError(c,"The specified node is not a master");
4460  return;
4461  }
4462 
4463  addReplyMultiBulkLen(c,n->numslaves);
4464  for (j = 0; j < n->numslaves; j++) {
4465  sds ni = clusterGenNodeDescription(n->slaves[j]);
4466  addReplyBulkCString(c,ni);
4467  sdsfree(ni);
4468  }
4469  } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4470  c->argc == 3)
4471  {
4472  /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4473  clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4474 
4475  if (!n) {
4476  addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4477  return;
4478  } else {
4479  addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4480  }
4481  } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4482  (c->argc == 2 || c->argc == 3))
4483  {
4484  /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4485  int force = 0, takeover = 0;
4486 
4487  if (c->argc == 3) {
4488  if (!strcasecmp(c->argv[2]->ptr,"force")) {
4489  force = 1;
4490  } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4491  takeover = 1;
4492  force = 1; /* Takeover also implies force. */
4493  } else {
4494  addReply(c,shared.syntaxerr);
4495  return;
4496  }
4497  }
4498 
4499  /* Check preconditions. */
4500  if (nodeIsMaster(myself)) {
4501  addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
4502  return;
4503  } else if (myself->slaveof == NULL) {
4504  addReplyError(c,"I'm a slave but my master is unknown to me");
4505  return;
4506  } else if (!force &&
4507  (nodeFailed(myself->slaveof) ||
4508  myself->slaveof->link == NULL))
4509  {
4510  addReplyError(c,"Master is down or failed, "
4511  "please use CLUSTER FAILOVER FORCE");
4512  return;
4513  }
4515  server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4516 
4517  if (takeover) {
4518  /* A takeover does not perform any initial check. It just
4519  * generates a new configuration epoch for this node without
4520  * consensus, claims the master's slots, and broadcast the new
4521  * configuration. */
4522  serverLog(LL_WARNING,"Taking over the master (user request).");
4525  } else if (force) {
4526  /* If this is a forced failover, we don't need to talk with our
4527  * master to agree about the offset. We just failover taking over
4528  * it without coordination. */
4529  serverLog(LL_WARNING,"Forced failover user request accepted.");
4530  server.cluster->mf_can_start = 1;
4531  } else {
4532  serverLog(LL_WARNING,"Manual failover user request accepted.");
4533  clusterSendMFStart(myself->slaveof);
4534  }
4535  addReply(c,shared.ok);
4536  } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4537  {
4538  /* CLUSTER SET-CONFIG-EPOCH <epoch>
4539  *
4540  * The user is allowed to set the config epoch only when a node is
4541  * totally fresh: no config epoch, no other known node, and so forth.
4542  * This happens at cluster creation time to start with a cluster where
4543  * every node has a different node ID, without to rely on the conflicts
4544  * resolution system which is too slow when a big cluster is created. */
4545  long long epoch;
4546 
4547  if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4548  return;
4549 
4550  if (epoch < 0) {
4551  addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4552  } else if (dictSize(server.cluster->nodes) > 1) {
4553  addReplyError(c,"The user can assign a config epoch only when the "
4554  "node does not know any other node.");
4555  } else if (myself->configEpoch != 0) {
4556  addReplyError(c,"Node config epoch is already non-zero");
4557  } else {
4558  myself->configEpoch = epoch;
4559  serverLog(LL_WARNING,
4560  "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4561  (unsigned long long) myself->configEpoch);
4562 
4563  if (server.cluster->currentEpoch < (uint64_t)epoch)
4564  server.cluster->currentEpoch = epoch;
4565  /* No need to fsync the config here since in the unlucky event
4566  * of a failure to persist the config, the conflict resolution code
4567  * will assign an unique config to this node. */
4568  clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4569  CLUSTER_TODO_SAVE_CONFIG);
4570  addReply(c,shared.ok);
4571  }
4572  } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4573  (c->argc == 2 || c->argc == 3))
4574  {
4575  /* CLUSTER RESET [SOFT|HARD] */
4576  int hard = 0;
4577 
4578  /* Parse soft/hard argument. Default is soft. */
4579  if (c->argc == 3) {
4580  if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4581  hard = 1;
4582  } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4583  hard = 0;
4584  } else {
4585  addReply(c,shared.syntaxerr);
4586  return;
4587  }
4588  }
4589 
4590  /* Slaves can be reset while containing data, but not master nodes
4591  * that must be empty. */
4592  if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4593  addReplyError(c,"CLUSTER RESET can't be called with "
4594  "master nodes containing keys");
4595  return;
4596  }
4597  clusterReset(hard);
4598  addReply(c,shared.ok);
4599  } else {
4600  addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
4601  }
4602 }
4603 
4604 /* -----------------------------------------------------------------------------
4605  * DUMP, RESTORE and MIGRATE commands
4606  * -------------------------------------------------------------------------- */
4607 
4608 /* Generates a DUMP-format representation of the object 'o', adding it to the
4609  * io stream pointed by 'rio'. This function can't fail. */
4610 void createDumpPayload(rio *payload, robj *o) {
4611  unsigned char buf[2];
4612  uint64_t crc;
4613 
4614  /* Serialize the object in a RDB-like format. It consist of an object type
4615  * byte followed by the serialized object. This is understood by RESTORE. */
4616  rioInitWithBuffer(payload,sdsempty());
4617  serverAssert(rdbSaveObjectType(payload,o));
4618  serverAssert(rdbSaveObject(payload,o));
4619 
4620  /* Write the footer, this is how it looks like:
4621  * ----------------+---------------------+---------------+
4622  * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4623  * ----------------+---------------------+---------------+
4624  * RDB version and CRC are both in little endian.
4625  */
4626 
4627  /* RDB version */
4628  buf[0] = RDB_VERSION & 0xff;
4629  buf[1] = (RDB_VERSION >> 8) & 0xff;
4630  payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4631 
4632  /* CRC64 */
4633  crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4634  sdslen(payload->io.buffer.ptr));
4635  memrev64ifbe(&crc);
4636  payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4637 }
4638 
4639 /* Verify that the RDB version of the dump payload matches the one of this Redis
4640  * instance and that the checksum is ok.
4641  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4642  * is returned. */
4643 int verifyDumpPayload(unsigned char *p, size_t len) {
4644  unsigned char *footer;
4645  uint16_t rdbver;
4646  uint64_t crc;
4647 
4648  /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4649  if (len < 10) return C_ERR;
4650  footer = p+(len-10);
4651 
4652  /* Verify RDB version */
4653  rdbver = (footer[1] << 8) | footer[0];
4654  if (rdbver > RDB_VERSION) return C_ERR;
4655 
4656  /* Verify CRC64 */
4657  crc = crc64(0,p,len-8);
4658  memrev64ifbe(&crc);
4659  return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4660 }
4661 
4662 /* DUMP keyname
4663  * DUMP is actually not used by Redis Cluster but it is the obvious
4664  * complement of RESTORE and can be useful for different applications. */
4665 void dumpCommand(client *c) {
4666  robj *o, *dumpobj;
4667  rio payload;
4668 
4669  /* Check if the key is here. */
4670  if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4671  addReply(c,shared.nullbulk);
4672  return;
4673  }
4674 
4675  /* Create the DUMP encoded representation. */
4676  createDumpPayload(&payload,o);
4677 
4678  /* Transfer to the client */
4679  dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4680  addReplyBulk(c,dumpobj);
4681  decrRefCount(dumpobj);
4682  return;
4683 }
4684 
4685 /* RESTORE key ttl serialized-value [REPLACE] */
4686 void restoreCommand(client *c) {
4687  long long ttl;
4688  rio payload;
4689  int j, type, replace = 0;
4690  robj *obj;
4691 
4692  /* Parse additional options */
4693  for (j = 4; j < c->argc; j++) {
4694  if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4695  replace = 1;
4696  } else {
4697  addReply(c,shared.syntaxerr);
4698  return;
4699  }
4700  }
4701 
4702  /* Make sure this key does not already exist here... */
4703  if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
4704  addReply(c,shared.busykeyerr);
4705  return;
4706  }
4707 
4708  /* Check if the TTL value makes sense */
4709  if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4710  return;
4711  } else if (ttl < 0) {
4712  addReplyError(c,"Invalid TTL value, must be >= 0");
4713  return;
4714  }
4715 
4716  /* Verify RDB version and data checksum. */
4717  if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4718  {
4719  addReplyError(c,"DUMP payload version or checksum are wrong");
4720  return;
4721  }
4722 
4723  rioInitWithBuffer(&payload,c->argv[3]->ptr);
4724  if (((type = rdbLoadObjectType(&payload)) == -1) ||
4725  ((obj = rdbLoadObject(type,&payload)) == NULL))
4726  {
4727  addReplyError(c,"Bad data format");
4728  return;
4729  }
4730 
4731  /* Remove the old key if needed. */
4732  if (replace) dbDelete(c->db,c->argv[1]);
4733 
4734  /* Create the key and set the TTL if any */
4735  dbAdd(c->db,c->argv[1],obj);
4736  if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl);
4737  signalModifiedKey(c->db,c->argv[1]);
4738  addReply(c,shared.ok);
4739  server.dirty++;
4740 }
4741 
4742 /* MIGRATE socket cache implementation.
4743  *
4744  * We take a map between host:ip and a TCP socket that we used to connect
4745  * to this instance in recent time.
4746  * This sockets are closed when the max number we cache is reached, and also
4747  * in serverCron() when they are around for more than a few seconds. */
4748 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
4749 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
4750 
4751 typedef struct migrateCachedSocket {
4752  int fd;
4756 
4757 /* Return a migrateCachedSocket containing a TCP socket connected with the
4758  * target instance, possibly returning a cached one.
4759  *
4760  * This function is responsible of sending errors to the client if a
4761  * connection can't be established. In this case -1 is returned.
4762  * Otherwise on success the socket is returned, and the caller should not
4763  * attempt to free it after usage.
4764  *
4765  * If the caller detects an error while using the socket, migrateCloseSocket()
4766  * should be called so that the connection will be created from scratch
4767  * the next time. */
4768 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
4769  int fd;
4770  sds name = sdsempty();
4772 
4773  /* Check if we have an already cached socket for this ip:port pair. */
4774  name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4775  name = sdscatlen(name,":",1);
4776  name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4777  cs = dictFetchValue(server.migrate_cached_sockets,name);
4778  if (cs) {
4779  sdsfree(name);
4780  cs->last_use_time = server.unixtime;
4781  return cs;
4782  }
4783 
4784  /* No cached socket, create one. */
4785  if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
4786  /* Too many items, drop one at random. */
4787  dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
4788  cs = dictGetVal(de);
4789  close(cs->fd);
4790  zfree(cs);
4791  dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4792  }
4793 
4794  /* Create the socket */
4795  fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
4796  atoi(c->argv[2]->ptr));
4797  if (fd == -1) {
4798  sdsfree(name);
4799  addReplyErrorFormat(c,"Can't connect to target node: %s",
4800  server.neterr);
4801  return NULL;
4802  }
4803  anetEnableTcpNoDelay(server.neterr,fd);
4804 
4805  /* Check if it connects within the specified timeout. */
4806  if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
4807  sdsfree(name);
4808  addReplySds(c,
4809  sdsnew("-IOERR error or timeout connecting to the client\r\n"));
4810  close(fd);
4811  return NULL;
4812  }
4813 
4814  /* Add to the cache and return it to the caller. */
4815  cs = zmalloc(sizeof(*cs));
4816  cs->fd = fd;
4817  cs->last_dbid = -1;
4818  cs->last_use_time = server.unixtime;
4819  dictAdd(server.migrate_cached_sockets,name,cs);
4820  return cs;
4821 }
4822 
4823 /* Free a migrate cached connection. */
4824 void migrateCloseSocket(robj *host, robj *port) {
4825  sds name = sdsempty();
4827 
4828  name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4829  name = sdscatlen(name,":",1);
4830  name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4831  cs = dictFetchValue(server.migrate_cached_sockets,name);
4832  if (!cs) {
4833  sdsfree(name);
4834  return;
4835  }
4836 
4837  close(cs->fd);
4838  zfree(cs);
4839  dictDelete(server.migrate_cached_sockets,name);
4840  sdsfree(name);
4841 }
4842 
4844  dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
4845  dictEntry *de;
4846 
4847  while((de = dictNext(di)) != NULL) {
4848  migrateCachedSocket *cs = dictGetVal(de);
4849 
4850  if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
4851  close(cs->fd);
4852  zfree(cs);
4853  dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4854  }
4855  }
4856  dictReleaseIterator(di);
4857 }
4858 
4859 /* MIGRATE host port key dbid timeout [COPY | REPLACE]
4860  *
4861  * On in the multiple keys form:
4862  *
4863  * MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN */
4864 void migrateCommand(client *c) {
4866  int copy, replace, j;
4867  long timeout;
4868  long dbid;
4869  robj **ov = NULL; /* Objects to migrate. */
4870  robj **kv = NULL; /* Key names. */
4871  robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
4872  rio cmd, payload;
4873  int may_retry = 1;
4874  int write_error = 0;
4875  int argv_rewritten = 0;
4876 
4877  /* To support the KEYS option we need the following additional state. */
4878  int first_key = 3; /* Argument index of the first key. */
4879  int num_keys = 1; /* By default only migrate the 'key' argument. */
4880 
4881  /* Initialization */
4882  copy = 0;
4883  replace = 0;
4884 
4885  /* Parse additional options */
4886  for (j = 6; j < c->argc; j++) {
4887  if (!strcasecmp(c->argv[j]->ptr,"copy")) {
4888  copy = 1;
4889  } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4890  replace = 1;
4891  } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
4892  if (sdslen(c->argv[3]->ptr) != 0) {
4893  addReplyError(c,
4894  "When using MIGRATE KEYS option, the key argument"
4895  " must be set to the empty string");
4896  return;
4897  }
4898  first_key = j+1;
4899  num_keys = c->argc - j - 1;
4900  break; /* All the remaining args are keys. */
4901  } else {
4902  addReply(c,shared.syntaxerr);
4903  return;
4904  }
4905  }
4906 
4907  /* Sanity check */
4908  if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
4909  getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
4910  {
4911  return;
4912  }
4913  if (timeout <= 0) timeout = 1000;
4914 
4915  /* Check if the keys are here. If at least one key is to migrate, do it
4916  * otherwise if all the keys are missing reply with "NOKEY" to signal
4917  * the caller there was nothing to migrate. We don't return an error in
4918  * this case, since often this is due to a normal condition like the key
4919  * expiring in the meantime. */
4920  ov = zrealloc(ov,sizeof(robj*)*num_keys);
4921  kv = zrealloc(kv,sizeof(robj*)*num_keys);
4922  int oi = 0;
4923 
4924  for (j = 0; j < num_keys; j++) {
4925  if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
4926  kv[oi] = c->argv[first_key+j];
4927  oi++;
4928  }
4929  }
4930  num_keys = oi;
4931  if (num_keys == 0) {
4932  zfree(ov); zfree(kv);
4933  addReplySds(c,sdsnew("+NOKEY\r\n"));
4934  return;
4935  }
4936 
4937 try_again:
4938  write_error = 0;
4939 
4940  /* Connect */
4941  cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
4942  if (cs == NULL) {
4943  zfree(ov); zfree(kv);
4944  return; /* error sent to the client by migrateGetSocket() */
4945  }
4946 
4947  rioInitWithBuffer(&cmd,sdsempty());
4948 
4949  /* Send the SELECT command if the current DB is not already selected. */
4950  int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
4951  if (select) {
4952  serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
4953  serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
4954  serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
4955  }
4956 
4957  /* Create RESTORE payload and generate the protocol to call the command. */
4958  for (j = 0; j < num_keys; j++) {
4959  long long ttl = 0;
4960  long long expireat = getExpire(c->db,kv[j]);
4961 
4962  if (expireat != -1) {
4963  ttl = expireat-mstime();
4964  if (ttl < 1) ttl = 1;
4965  }
4966  serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
4967  if (server.cluster_enabled)
4968  serverAssertWithInfo(c,NULL,
4969  rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
4970  else
4971  serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
4972  serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
4973  serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
4974  sdslen(kv[j]->ptr)));
4975  serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
4976 
4977  /* Emit the payload argument, that is the serialized object using
4978  * the DUMP format. */
4979  createDumpPayload(&payload,ov[j]);
4980  serverAssertWithInfo(c,NULL,
4981  rioWriteBulkString(&cmd,payload.io.buffer.ptr,
4982  sdslen(payload.io.buffer.ptr)));
4983  sdsfree(payload.io.buffer.ptr);
4984 
4985  /* Add the REPLACE option to the RESTORE command if it was specified
4986  * as a MIGRATE option. */
4987  if (replace)
4988  serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
4989  }
4990 
4991  /* Transfer the query to the other node in 64K chunks. */
4992  errno = 0;
4993  {
4994  sds buf = cmd.io.buffer.ptr;
4995  size_t pos = 0, towrite;
4996  int nwritten = 0;
4997 
4998  while ((towrite = sdslen(buf)-pos) > 0) {
4999  towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5000  nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
5001  if (nwritten != (signed)towrite) {
5002  write_error = 1;
5003  goto socket_err;
5004  }
5005  pos += nwritten;
5006  }
5007  }
5008 
5009  char buf1[1024]; /* Select reply. */
5010  char buf2[1024]; /* Restore reply. */
5011 
5012  /* Read the SELECT reply if needed. */
5013  if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
5014  goto socket_err;
5015 
5016  /* Read the RESTORE replies. */
5017  int error_from_target = 0;
5018  int socket_error = 0;
5019  int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5020 
5021  if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5022 
5023  for (j = 0; j < num_keys; j++) {
5024  if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
5025  socket_error = 1;
5026  break;
5027  }
5028  if ((select && buf1[0] == '-') || buf2[0] == '-') {
5029  /* On error assume that last_dbid is no longer valid. */
5030  if (!error_from_target) {
5031  cs->last_dbid = -1;
5032  addReplyErrorFormat(c,"Target instance replied with error: %s",
5033  (select && buf1[0] == '-') ? buf1+1 : buf2+1);
5034  error_from_target = 1;
5035  }
5036  } else {
5037  if (!copy) {
5038  /* No COPY option: remove the local key, signal the change. */
5039  dbDelete(c->db,kv[j]);
5040  signalModifiedKey(c->db,kv[j]);
5041  server.dirty++;
5042 
5043  /* Populate the argument vector to replace the old one. */
5044  newargv[del_idx++] = kv[j];
5045  incrRefCount(kv[j]);
5046  }
5047  }
5048  }
5049 
5050  /* On socket error, if we want to retry, do it now before rewriting the
5051  * command vector. We only retry if we are sure nothing was processed
5052  * and we failed to read the first reply (j == 0 test). */
5053  if (!error_from_target && socket_error && j == 0 && may_retry &&
5054  errno != ETIMEDOUT)
5055  {
5056  goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5057  }
5058 
5059  /* On socket errors, close the migration socket now that we still have
5060  * the original host/port in the ARGV. Later the original command may be
5061  * rewritten to DEL and will be too later. */
5062  if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5063 
5064  if (!copy) {
5065  /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5066  * this only for the keys for which we received an acknowledgement
5067  * from the receiving Redis server, by using the del_idx index. */
5068  if (del_idx > 1) {
5069  newargv[0] = createStringObject("DEL",3);
5070  /* Note that the following call takes ownership of newargv. */
5071  replaceClientCommandVector(c,del_idx,newargv);
5072  argv_rewritten = 1;
5073  } else {
5074  /* No key transfer acknowledged, no need to rewrite as DEL. */
5075  zfree(newargv);
5076  }
5077  newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5078  }
5079 
5080  /* If we are here and a socket error happened, we don't want to retry.
5081  * Just signal the problem to the client, but only do it if we did not
5082  * already queue a different error reported by the destination server. */
5083  if (!error_from_target && socket_error) {
5084  may_retry = 0;
5085  goto socket_err;
5086  }
5087 
5088  if (!error_from_target) {
5089  /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5090  * avoid SELECT the next time if the target DB is the same. Reply +OK.
5091  *
5092  * Note: If we reached this point, even if socket_error is true
5093  * still the SELECT command succeeded (otherwise the code jumps to
5094  * socket_err label. */
5095  cs->last_dbid = dbid;
5096  addReply(c,shared.ok);
5097  } else {
5098  /* On error we already sent it in the for loop above, and set
5099  * the curretly selected socket to -1 to force SELECT the next time. */
5100  }
5101 
5102  sdsfree(cmd.io.buffer.ptr);
5103  zfree(ov); zfree(kv); zfree(newargv);
5104  return;
5105 
5106 /* On socket errors we try to close the cached socket and try again.
5107  * It is very common for the cached socket to get closed, if just reopening
5108  * it works it's a shame to notify the error to the caller. */
5109 socket_err:
5110  /* Cleanup we want to perform in both the retry and no retry case.
5111  * Note: Closing the migrate socket will also force SELECT next time. */
5112  sdsfree(cmd.io.buffer.ptr);
5113 
5114  /* If the command was rewritten as DEL and there was a socket error,
5115  * we already closed the socket earlier. While migrateCloseSocket()
5116  * is idempotent, the host/port arguments are now gone, so don't do it
5117  * again. */
5118  if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5119  zfree(newargv);
5120  newargv = NULL; /* This will get reallocated on retry. */
5121 
5122  /* Retry only if it's not a timeout and we never attempted a retry
5123  * (or the code jumping here did not set may_retry to zero). */
5124  if (errno != ETIMEDOUT && may_retry) {
5125  may_retry = 0;
5126  goto try_again;
5127  }
5128 
5129  /* Cleanup we want to do if no retry is attempted. */
5130  zfree(ov); zfree(kv);
5131  addReplySds(c,
5132  sdscatprintf(sdsempty(),
5133  "-IOERR error or timeout %s to target instance\r\n",
5134  write_error ? "writing" : "reading"));
5135  return;
5136 }
5137 
5138 /* -----------------------------------------------------------------------------
5139  * Cluster functions related to serving / redirecting clients
5140  * -------------------------------------------------------------------------- */
5141 
5142 /* The ASKING command is required after a -ASK redirection.
5143  * The client should issue ASKING before to actually send the command to
5144  * the target instance. See the Redis Cluster specification for more
5145  * information. */
5146 void askingCommand(client *c) {
5147  if (server.cluster_enabled == 0) {
5148  addReplyError(c,"This instance has cluster support disabled");
5149  return;
5150  }
5151  c->flags |= CLIENT_ASKING;
5152  addReply(c,shared.ok);
5153 }
5154 
5155 /* The READONLY command is used by clients to enter the read-only mode.
5156  * In this mode slaves will not redirect clients as long as clients access
5157  * with read-only commands to keys that are served by the slave's master. */
5158 void readonlyCommand(client *c) {
5159  if (server.cluster_enabled == 0) {
5160  addReplyError(c,"This instance has cluster support disabled");
5161  return;
5162  }
5163  c->flags |= CLIENT_READONLY;
5164  addReply(c,shared.ok);
5165 }
5166 
5167 /* The READWRITE command just clears the READONLY command state. */
5168 void readwriteCommand(client *c) {
5169  c->flags &= ~CLIENT_READONLY;
5170  addReply(c,shared.ok);
5171 }
5172 
5173 /* Return the pointer to the cluster node that is able to serve the command.
5174  * For the function to succeed the command should only target either:
5175  *
5176  * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5177  * 2) Multiple keys in the same hash slot, while the slot is stable (no
5178  * resharding in progress).
5179  *
5180  * On success the function returns the node that is able to serve the request.
5181  * If the node is not 'myself' a redirection must be perfomed. The kind of
5182  * redirection is specified setting the integer passed by reference
5183  * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5184  * CLUSTER_REDIR_MOVED.
5185  *
5186  * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5187  *
5188  * If the command fails NULL is returned, and the reason of the failure is
5189  * provided via 'error_code', which will be set to:
5190  *
5191  * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5192  * don't belong to the same hash slot.
5193  *
5194  * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5195  * belonging to the same slot, but the slot is not stable (in migration or
5196  * importing state, likely because a resharding is in progress).
5197  *
5198  * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5199  * not bound to any node. In this case the cluster global state should be
5200  * already "down" but it is fragile to rely on the update of the global state,
5201  * so we also handle it here.
5202  *
5203  * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
5204  * execute a command that addresses one or more keys. */
5205 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5206  clusterNode *n = NULL;
5207  robj *firstkey = NULL;
5208  int multiple_keys = 0;
5209  multiState *ms, _ms;
5210  multiCmd mc;
5211  int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5212 
5213  /* Set error code optimistically for the base case. */
5214  if (error_code) *error_code = CLUSTER_REDIR_NONE;
5215 
5216  /* We handle all the cases as if they were EXEC commands, so we have
5217  * a common code path for everything */
5218  if (cmd->proc == execCommand) {
5219  /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5220  * error. */
5221  if (!(c->flags & CLIENT_MULTI)) return myself;
5222  ms = &c->mstate;
5223  } else {
5224  /* In order to have a single codepath create a fake Multi State
5225  * structure if the client is not in MULTI/EXEC state, this way
5226  * we have a single codepath below. */
5227  ms = &_ms;
5228  _ms.commands = &mc;
5229  _ms.count = 1;
5230  mc.argv = argv;
5231  mc.argc = argc;
5232  mc.cmd = cmd;
5233  }
5234 
5235  /* Check that all the keys are in the same hash slot, and obtain this
5236  * slot and the node associated. */
5237  for (i = 0; i < ms->count; i++) {
5238  struct redisCommand *mcmd;
5239  robj **margv;
5240  int margc, *keyindex, numkeys, j;
5241 
5242  mcmd = ms->commands[i].cmd;
5243  margc = ms->commands[i].argc;
5244  margv = ms->commands[i].argv;
5245 
5246  keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5247  for (j = 0; j < numkeys; j++) {
5248  robj *thiskey = margv[keyindex[j]];
5249  int thisslot = keyHashSlot((char*)thiskey->ptr,
5250  sdslen(thiskey->ptr));
5251 
5252  if (firstkey == NULL) {
5253  /* This is the first key we see. Check what is the slot
5254  * and node. */
5255  firstkey = thiskey;
5256  slot = thisslot;
5257  n = server.cluster->slots[slot];
5258 
5259  /* Error: If a slot is not served, we are in "cluster down"
5260  * state. However the state is yet to be updated, so this was
5261  * not trapped earlier in processCommand(). Report the same
5262  * error to the client. */
5263  if (n == NULL) {
5264  getKeysFreeResult(keyindex);
5265  if (error_code)
5266  *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5267  return NULL;
5268  }
5269 
5270  /* If we are migrating or importing this slot, we need to check
5271  * if we have all the keys in the request (the only way we
5272  * can safely serve the request, otherwise we return a TRYAGAIN
5273  * error). To do so we set the importing/migrating state and
5274  * increment a counter for every missing key. */
5275  if (n == myself &&
5276  server.cluster->migrating_slots_to[slot] != NULL)
5277  {
5278  migrating_slot = 1;
5279  } else if (server.cluster->importing_slots_from[slot] != NULL) {
5280  importing_slot = 1;
5281  }
5282  } else {
5283  /* If it is not the first key, make sure it is exactly
5284  * the same key as the first we saw. */
5285  if (!equalStringObjects(firstkey,thiskey)) {
5286  if (slot != thisslot) {
5287  /* Error: multiple keys from different slots. */
5288  getKeysFreeResult(keyindex);
5289  if (error_code)
5290  *error_code = CLUSTER_REDIR_CROSS_SLOT;
5291  return NULL;
5292  } else {
5293  /* Flag this request as one with multiple different
5294  * keys. */
5295  multiple_keys = 1;
5296  }
5297  }
5298  }
5299 
5300  /* Migarting / Improrting slot? Count keys we don't have. */
5301  if ((migrating_slot || importing_slot) &&
5302  lookupKeyRead(&server.db[0],thiskey) == NULL)
5303  {
5304  missing_keys++;
5305  }
5306  }
5307  getKeysFreeResult(keyindex);
5308  }
5309 
5310  /* No key at all in command? then we can serve the request
5311  * without redirections or errors in all the cases. */
5312  if (n == NULL) return myself;
5313 
5314  /* Cluster is globally down but we got keys? We can't serve the request. */
5315  if (server.cluster->state != CLUSTER_OK) {
5316  if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5317  return NULL;
5318  }
5319 
5320  /* Return the hashslot by reference. */
5321  if (hashslot) *hashslot = slot;
5322 
5323  /* MIGRATE always works in the context of the local node if the slot
5324  * is open (migrating or importing state). We need to be able to freely
5325  * move keys among instances in this case. */
5326  if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5327  return myself;
5328 
5329  /* If we don't have all the keys and we are migrating the slot, send
5330  * an ASK redirection. */
5331  if (migrating_slot && missing_keys) {
5332  if (error_code) *error_code = CLUSTER_REDIR_ASK;
5333  return server.cluster->migrating_slots_to[slot];
5334  }
5335 
5336  /* If we are receiving the slot, and the client correctly flagged the
5337  * request as "ASKING", we can serve the request. However if the request
5338  * involves multiple keys and we don't have them all, the only option is
5339  * to send a TRYAGAIN error. */
5340  if (importing_slot &&
5341  (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5342  {
5343  if (multiple_keys && missing_keys) {
5344  if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5345  return NULL;
5346  } else {
5347  return myself;
5348  }
5349  }
5350 
5351  /* Handle the read-only client case reading from a slave: if this
5352  * node is a slave and the request is about an hash slot our master
5353  * is serving, we can reply without redirection. */
5354  if (c->flags & CLIENT_READONLY &&
5355  cmd->flags & CMD_READONLY &&
5356  nodeIsSlave(myself) &&
5357  myself->slaveof == n)
5358  {
5359  return myself;
5360  }
5361 
5362  /* Base case: just return the right node. However if this node is not
5363  * myself, set error_code to MOVED since we need to issue a rediretion. */
5364  if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5365  return n;
5366 }
5367 
5368 /* Send the client the right redirection code, according to error_code
5369  * that should be set to one of CLUSTER_REDIR_* macros.
5370  *
5371  * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5372  * are used, then the node 'n' should not be NULL, but should be the
5373  * node we want to mention in the redirection. Moreover hashslot should
5374  * be set to the hash slot that caused the redirection. */
5375 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5376  if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5377  addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5378  } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5379  /* The request spawns mutliple keys in the same slot,
5380  * but the slot is not "stable" currently as there is
5381  * a migration or import in progress. */
5382  addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5383  } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5384  addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5385  } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5386  addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5387  } else if (error_code == CLUSTER_REDIR_MOVED ||
5388  error_code == CLUSTER_REDIR_ASK)
5389  {
5390  addReplySds(c,sdscatprintf(sdsempty(),
5391  "-%s %d %s:%d\r\n",
5392  (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5393  hashslot,n->ip,n->port));
5394  } else {
5395  serverPanic("getNodeByQuery() unknown error.");
5396  }
5397 }
5398 
5399 /* This function is called by the function processing clients incrementally
5400  * to detect timeouts, in order to handle the following case:
5401  *
5402  * 1) A client blocks with BLPOP or similar blocking operation.
5403  * 2) The master migrates the hash slot elsewhere or turns into a slave.
5404  * 3) The client may remain blocked forever (or up to the max timeout time)
5405  * waiting for a key change that will never happen.
5406  *
5407  * If the client is found to be blocked into an hash slot this node no
5408  * longer handles, the client is sent a redirection error, and the function
5409  * returns 1. Otherwise 0 is returned and no operation is performed. */
5411  if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_LIST) {
5412  dictEntry *de;
5413  dictIterator *di;
5414 
5415  /* If the cluster is down, unblock the client with the right error. */
5416  if (server.cluster->state == CLUSTER_FAIL) {
5417  clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5418  return 1;
5419  }
5420 
5421  di = dictGetIterator(c->bpop.keys);
5422  while((de = dictNext(di)) != NULL) {
5423  robj *key = dictGetKey(de);
5424  int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5425  clusterNode *node = server.cluster->slots[slot];
5426 
5427  /* We send an error and unblock the client if:
5428  * 1) The slot is unassigned, emitting a cluster down error.
5429  * 2) The slot is not handled by this node, nor being imported. */
5430  if (node != myself &&
5431  server.cluster->importing_slots_from[slot] == NULL)
5432  {
5433  if (node == NULL) {
5435  CLUSTER_REDIR_DOWN_UNBOUND);
5436  } else {
5437  clusterRedirectClient(c,node,slot,
5438  CLUSTER_REDIR_MOVED);
5439  }
5440  return 1;
5441  }
5442  }
5443  dictReleaseIterator(di);
5444  }
5445  return 0;
5446 }
size_t len
Definition: 6502dis.c:15
#define e(frag)
#define mask()
lzma_index ** i
Definition: index.h:629
#define UNUSED
static bool err
Definition: armass.c:435
static SblHeader sb
Definition: bin_mbn.c:26
RzBinInfo * info(RzBinFile *bf)
Definition: bin_ne.c:86
int clusterBlacklistExists(char *nodeid)
Definition: cluster.c:1136
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request)
Definition: cluster.c:2594
void clusterHandleManualFailover(void)
Definition: cluster.c:3165
void clearNodeFailureIfNeeded(clusterNode *node)
Definition: cluster.c:1200
void clusterRequestFailoverAuth(void)
Definition: cluster.c:2552
void clusterHandleConfigEpochCollision(clusterNode *sender)
Definition: cluster.c:1055
static struct redisNodeFlags redisNodeFlagsTable[]
Definition: cluster.c:3825
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code)
Definition: cluster.c:5375
void clusterBeforeSleep(void)
Definition: cluster.c:3467
void clusterSendUpdate(clusterLink *link, clusterNode *node)
Definition: cluster.c:2519
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link)
Definition: cluster.c:1316
void manualFailoverCheckTimeout(void)
Definition: cluster.c:3156
#define CLUSTER_MIN_REJOIN_DELAY
Definition: cluster.c:3627
void clusterBroadcastPong(int target)
Definition: cluster.c:2437
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask)
Definition: cluster.c:593
void clusterSendFailoverAuth(clusterNode *node)
Definition: cluster.c:2568
void clusterCron(void)
Definition: cluster.c:3190
void clusterBroadcastMessage(void *buf, size_t len)
Definition: cluster.c:2172
void clusterNodeCleanupFailureReports(clusterNode *node)
Definition: cluster.c:742
void clusterCloseAllSlots(void)
Definition: cluster.c:3611
void clusterPropagatePublish(robj *channel, robj *message)
Definition: cluster.c:2538
sds representClusterNodeFlags(sds ci, uint16_t flags)
Definition: cluster.c:3837
void clusterSaveConfigOrDie(int do_fsync)
Definition: cluster.c:353
void clusterUpdateState(void)
Definition: cluster.c:3630
void clusterRenameNode(clusterNode *node, char *newname)
Definition: cluster.c:927
#define CLUSTER_BROADCAST_LOCAL_SLAVES
Definition: cluster.c:2436
int bitmapTestBit(unsigned char *bitmap, int pos)
Definition: cluster.c:3499
int clusterRedirectBlockedClientIfNeeded(client *c)
Definition: cluster.c:5410
int clusterNodeGetSlotBit(clusterNode *n, int slot)
Definition: cluster.c:3570
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots)
Definition: cluster.c:1506
void readwriteCommand(client *c)
Definition: cluster.c:5168
void migrateCommand(client *c)
Definition: cluster.c:4864
void migrateCloseTimedoutSockets(void)
Definition: cluster.c:4843
struct migrateCachedSocket migrateCachedSocket
sds clusterGenNodesDescription(int filter)
Definition: cluster.c:3932
int clusterLockConfig(char *filename)
Definition: cluster.c:369
int clusterAddSlot(clusterNode *n, int slot)
Definition: cluster.c:3578
int clusterStartHandshake(char *ip, int port, int cport)
Definition: cluster.c:1259
int clusterCountNonFailingSlaves(clusterNode *n)
Definition: cluster.c:830
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender)
Definition: cluster.c:769
clusterNode * createClusterNode(char *nodename, int flags)
Definition: cluster.c:672
void clusterDoBeforeSleep(int flags)
Definition: cluster.c:3489
int clusterBumpConfigEpochWithoutConsensus(void)
Definition: cluster.c:990
void clusterReplyMultiBulkSlots(client *c)
Definition: cluster.c:3982
int clusterHandshakeInProgress(char *ip, int port, int cport)
Definition: cluster.c:1235
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n)
Definition: cluster.c:2278
void clusterHandleSlaveFailover(void)
Definition: cluster.c:2832
int clusterProcessPacket(clusterLink *link)
Definition: cluster.c:1605
int clusterNodeFailureReportsCount(clusterNode *node)
Definition: cluster.c:792
void restoreCommand(client *c)
Definition: cluster.c:4686
void clusterCommand(client *c)
Definition: cluster.c:4056
void clusterDelNode(clusterNode *delnode)
Definition: cluster.c:883
int clusterLoadConfig(char *filename)
Definition: cluster.c:89
#define CLUSTER_BLACKLIST_TTL
Definition: cluster.c:1094
migrateCachedSocket * migrateGetSocket(client *c, robj *host, robj *port, long timeout)
Definition: cluster.c:4768
void clusterSetNodeAsMaster(clusterNode *n)
Definition: cluster.c:1479
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask)
Definition: cluster.c:2063
int clusterNodeClearSlotBit(clusterNode *n, int slot)
Definition: cluster.c:3562
void createDumpPayload(rio *payload, robj *o)
Definition: cluster.c:4610
int verifyDumpPayload(unsigned char *p, size_t len)
Definition: cluster.c:4643
void clusterSendPing(clusterLink *link, int type)
Definition: cluster.c:2293
void clusterSendFail(char *nodename)
Definition: cluster.c:2507
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask)
Definition: cluster.c:2084
void readonlyCommand(client *c)
Definition: cluster.c:5158
#define MAX_CLUSTER_ACCEPTS_PER_CALL
Definition: cluster.c:592
void bitmapSetBit(unsigned char *bitmap, int pos)
Definition: cluster.c:3506
int getSlotOrReply(client *c, robj *o)
Definition: cluster.c:3970
void clusterSetMaster(clusterNode *n)
Definition: cluster.c:3798
unsigned int keyHashSlot(char *key, int keylen)
Definition: cluster.c:640
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen)
Definition: cluster.c:2152
void handleLinkIOError(clusterLink *link)
Definition: cluster.c:2056
void clusterSendPublish(clusterLink *link, robj *channel, robj *message)
Definition: cluster.c:2461
int clusterAddNode(clusterNode *node)
Definition: cluster.c:864
clusterNode * getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code)
Definition: cluster.c:5205
void clusterLogCantFailover(int reason)
Definition: cluster.c:2746
void clusterBlacklistCleanup(void)
Definition: cluster.c:1103
void clusterBlacklistAddNode(clusterNode *node)
Definition: cluster.c:1118
void bitmapClearBit(unsigned char *bitmap, int pos)
Definition: cluster.c:3513
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender)
Definition: cluster.c:712
int verifyClusterConfigWithData(void)
Definition: cluster.c:3748
#define MIGRATE_SOCKET_CACHE_ITEMS
Definition: cluster.c:4748
void markNodeAsFailingIfNeeded(clusterNode *node)
Definition: cluster.c:1171
const char * clusterGetMessageTypeString(int type)
Definition: cluster.c:3955
void dumpCommand(client *c)
Definition: cluster.c:4665
void clusterReset(int hard)
Definition: cluster.c:508
void clusterInit(void)
Definition: cluster.c:407
void clusterBuildMessageHdr(clusterMsg *hdr, int type)
Definition: cluster.c:2190
#define MIGRATE_SOCKET_CACHE_TTL
Definition: cluster.c:4749
void resetManualFailover(void)
Definition: cluster.c:3144
int clusterDelNodeSlots(clusterNode *node)
Definition: cluster.c:3599
int clusterDelSlot(int slot)
Definition: cluster.c:3588
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *hdr)
Definition: cluster.c:1441
#define CLUSTER_MAX_REJOIN_DELAY
Definition: cluster.c:3626
clusterLink * createClusterLink(clusterNode *node)
Definition: cluster.c:566
void clusterHandleSlaveMigration(int max_slaves)
Definition: cluster.c:3026
int clusterNodeSetSlotBit(clusterNode *n, int slot)
Definition: cluster.c:3537
void askingCommand(client *c)
Definition: cluster.c:5146
uint64_t clusterGetMaxEpoch(void)
Definition: cluster.c:946
void clusterSendMFStart(clusterNode *node)
Definition: cluster.c:2581
void clusterFailoverReplaceYourMaster(void)
Definition: cluster.c:2794
void freeClusterNode(clusterNode *n)
Definition: cluster.c:839
int clusterMastersHaveSlaves(void)
Definition: cluster.c:3522
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave)
Definition: cluster.c:797
int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n)
Definition: cluster.c:2267
clusterNode * clusterLookupNode(char *name)
Definition: cluster.c:913
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave)
Definition: cluster.c:816
sds clusterGenNodeDescription(clusterNode *node)
Definition: cluster.c:3855
int clusterGetSlaveRank(void)
Definition: cluster.c:2708
#define CLUSTER_WRITABLE_DELAY
Definition: cluster.c:3628
void nodeIp2String(char *buf, clusterLink *link, char *announced_ip)
Definition: cluster.c:1420
#define CLUSTER_BROADCAST_ALL
Definition: cluster.c:2435
int clusterSaveConfig(int do_fsync)
Definition: cluster.c:306
void freeClusterLink(clusterLink *link)
Definition: cluster.c:579
void migrateCloseSocket(robj *host, robj *port)
Definition: cluster.c:4824
clusterNode * myself
Definition: cluster.c:47
#define NULL
Definition: cris-opc.c:27
RzCryptoSelector bit
Definition: crypto.c:16
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence ftruncate
Definition: sflib.h:113
static static fork write
Definition: sflib.h:33
static static sync static getppid static getegid const char static filename request
Definition: sflib.h:62
static static fork const void static count close
Definition: sflib.h:33
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void count
Definition: sflib.h:98
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len static semflg const void static shmflg const struct timespec struct timespec static rem const char static group const void start
Definition: sflib.h:133
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags cmd
Definition: sflib.h:79
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset fstat
Definition: sflib.h:107
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg static fd static protocol static who struct sockaddr static addrlen static backlog struct timeval struct timezone static tz const struct iovec static count static mode const void const struct sockaddr static tolen const char static pathname void static offset struct stat static buf void long static basep static whence static length const void static len key
Definition: sflib.h:118
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags long
Definition: sflib.h:79
static static sync static getppid static getegid const char static filename char static len const char char static bufsiz static mask static vfork const void static prot static getpgrp const char static swapflags static arg fsync
Definition: sflib.h:79
static static fork const void static count static fd link
Definition: sflib.h:33
struct @667 g
int max
Definition: enough.c:225
voidpf void uLong size
Definition: ioapi.h:138
const char * filename
Definition: ioapi.h:137
voidpf uLong offset
Definition: ioapi.h:144
voidpf void * buf
Definition: ioapi.h:138
return memset(p, 0, total)
void * p
Definition: libc.cpp:67
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
static void list(RzEgg *egg)
Definition: rz-gg.c:52
static static fork const void static count static fd const char const char static newpath char char char static envp time
Definition: sflib.h:42
static static fork const void static count static fd const char const char static newpath char char argv
Definition: sflib.h:40
static static fork const void static count static fd const char const char static newpath char char char static envp time_t static t const char static mode static whence const char static dir time_t static t unsigned static seconds const char struct utimbuf static buf static inc static sig const char static mode static oldfd struct tms static buf static getgid static geteuid const char static filename static arg static mask struct ustat static ubuf static getppid static setsid static egid sigset_t static set struct timeval struct timezone static tz select
Definition: sflib.h:108
#define ip
static uint32_t const uint8_t * buf2
Definition: memcmplen.h:43
static void del(RzAnalysis *a, RzAnalysisMetaType type, const RzSpace *space, ut64 addr, ut64 size)
Definition: meta.c:155
int n
Definition: mipsasm.c:19
int type
Definition: mipsasm.c:17
string FILE
Definition: benchmark.py:21
line
Definition: setup.py:34
static struct @218 keys[]
static RzSocket * s
Definition: rtr.c:28
static struct sockaddr static addrlen static backlog const void static flags void flags
Definition: sfsocketcall.h:123
static struct sockaddr static addrlen static backlog const void msg
Definition: sfsocketcall.h:119
#define ENOENT
Definition: sftypes.h:112
#define O_WRONLY
Definition: sftypes.h:487
#define O_CREAT
Definition: sftypes.h:489
unsigned short uint16_t
Definition: sftypes.h:30
#define EINVAL
Definition: sftypes.h:132
long int64_t
Definition: sftypes.h:32
#define LOCK_NB
Definition: sftypes.h:539
unsigned int uint32_t
Definition: sftypes.h:29
#define htons(x)
Definition: sftypes.h:475
#define ETIMEDOUT
Definition: sftypes.h:170
#define LOCK_EX
Definition: sftypes.h:538
#define AF_INET
Definition: sftypes.h:287
#define AF_INET6
Definition: sftypes.h:295
unsigned long uint64_t
Definition: sftypes.h:28
int off_t
Definition: sftypes.h:41
#define ntohs(x)
Definition: sftypes.h:478
#define EAGAIN
Definition: sftypes.h:121
int ssize_t
Definition: sftypes.h:39
int time_t
Definition: sftypes.h:66
#define c(i)
Definition: sha256.c:43
time_t last_use_time
Definition: cluster.c:4754
Definition: z80asm.h:102
uint16_t flag
Definition: cluster.c:3821
char * name
Definition: cluster.c:3822
Definition: sftypes.h:80
char * message
Definition: main.c:12
int pos
Definition: main.c:11
uv_timer_t timeout
Definition: main.c:9
int replace(char *string, const char *token, const char *fmt,...)
Definition: tms320_dasm.c:325
if(dbg->bits==RZ_SYS_BITS_64)
Definition: windows-arm64.h:4
static xtensa_slot_internal slots[]
static const z80_opcode fd[]
Definition: z80_tab.h:997
int read(izstream &zs, T *x, Items items)
Definition: zstream.h:115
voidpf ZLIB_INTERNAL zcalloc(voidpf opaque, unsigned items, unsigned size)
Definition: zutil.c:305