当前位置:网站首页>Gossip about redis source code 73
Gossip about redis source code 73
2022-07-03 23:23:00 【Tao song remains the same】
Load the information in the configuration file , Non emphasis :
/* Load the cluster config from 'filename'.
*
* If the file does not exist or is zero-length (this may happen because
* when we lock the nodes.conf file, we create a zero-length one for the
* sake of locking if it does not already exist), C_ERR is returned.
* If the configuration was loaded from the file, C_OK is returned. */
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
struct stat sb;
char *line;
int maxline, j;
if (fp == NULL) {
if (errno == ENOENT) {
return C_ERR;
} else {
serverLog(LL_WARNING,
"Loading the cluster node config from %s: %s",
filename, strerror(errno));
exit(1);
}
}
/* Check if the file is zero-length: if so return C_ERR to signal
* we have to write the config. */
if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
fclose(fp);
return C_ERR;
}
/* Parse the file. Note that single lines of the cluster config file can
* be really long as they include all the hash slots of the node.
* This means in the worst possible case, half of the Redis slots will be
* present in a single line, possibly in importing or migrating state, so
* together with the node ID of the sender/receiver.
*
* To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
maxline = 1024+CLUSTER_SLOTS*128;
line = zmalloc(maxline);
while(fgets(line,maxline,fp) != NULL) {
int argc;
sds *argv;
clusterNode *n, *master;
char *p, *s;
/* Skip blank lines, they can be created either by users manually
* editing nodes.conf or by the config writing process if stopped
* before the truncate() call. */
if (line[0] == '\n' || line[0] == '\0') continue;
/* Split the line into arguments for processing. */
argv = sdssplitargs(line,&argc);
if (argv == NULL) goto fmterr;
/* Handle the special "vars" line. Don't pretend it is the last
* line even if it actually is when generated by Redis. */
if (strcasecmp(argv[0],"vars") == 0) {
if (!(argc % 2)) goto fmterr;
for (j = 1; j < argc; j += 2) {
if (strcasecmp(argv[j],"currentEpoch") == 0) {
server.cluster->currentEpoch =
strtoull(argv[j+1],NULL,10);
} else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
server.cluster->lastVoteEpoch =
strtoull(argv[j+1],NULL,10);
} else {
serverLog(LL_WARNING,
"Skipping unknown cluster config variable '%s'",
argv[j]);
}
}
sdsfreesplitres(argv,argc);
continue;
}
/* Regular config lines have at least eight fields */
if (argc < 8) {
sdsfreesplitres(argv,argc);
goto fmterr;
}
/* Create this node if it does not exist */
n = clusterLookupNode(argv[0]);
if (!n) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}
/* Address and port */
if ((p = strrchr(argv[1],':')) == NULL) {
sdsfreesplitres(argv,argc);
goto fmterr;
}
*p = '\0';
memcpy(n->ip,argv[1],strlen(argv[1])+1);
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
*busp = '\0';
busp++;
}
n->port = atoi(port);
/* In older versions of nodes.conf the "@busport" part is missing.
* In this case we set it to the default offset of 10000 from the
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */
/* Parse flags */
p = s = argv[2];
while(p) {
p = strchr(s,',');
if (p) *p = '\0';
if (!strcasecmp(s,"myself")) {
serverAssert(server.cluster->myself == NULL);
myself = server.cluster->myself = n;
n->flags |= CLUSTER_NODE_MYSELF;
} else if (!strcasecmp(s,"master")) {
n->flags |= CLUSTER_NODE_MASTER;
} else if (!strcasecmp(s,"slave")) {
n->flags |= CLUSTER_NODE_SLAVE;
} else if (!strcasecmp(s,"fail?")) {
n->flags |= CLUSTER_NODE_PFAIL;
} else if (!strcasecmp(s,"fail")) {
n->flags |= CLUSTER_NODE_FAIL;
n->fail_time = mstime();
} else if (!strcasecmp(s,"handshake")) {
n->flags |= CLUSTER_NODE_HANDSHAKE;
} else if (!strcasecmp(s,"noaddr")) {
n->flags |= CLUSTER_NODE_NOADDR;
} else if (!strcasecmp(s,"nofailover")) {
n->flags |= CLUSTER_NODE_NOFAILOVER;
} else if (!strcasecmp(s,"noflags")) {
/* nothing to do */
} else {
serverPanic("Unknown flag in redis cluster config file");
}
if (p) s = p+1;
}
/* Get master if any. Set the master and populate master's
* slave list. */
if (argv[3][0] != '-') {
master = clusterLookupNode(argv[3]);
if (!master) {
master = createClusterNode(argv[3],0);
clusterAddNode(master);
}
n->slaveof = master;
clusterNodeAddSlave(master,n);
}
/* Set ping sent / pong received timestamps */
if (atoi(argv[4])) n->ping_sent = mstime();
if (atoi(argv[5])) n->pong_received = mstime();
/* Set configEpoch for this node. */
n->configEpoch = strtoull(argv[6],NULL,10);
/* Populate hash slots served by this instance. */
for (j = 8; j < argc; j++) {
int start, stop;
if (argv[j][0] == '[') {
/* Here we handle migrating / importing slots */
int slot;
char direction;
clusterNode *cn;
p = strchr(argv[j],'-');
serverAssert(p != NULL);
*p = '\0';
direction = p[1]; /* Either '>' or '<' */
slot = atoi(argv[j]+1);
if (slot < 0 || slot >= CLUSTER_SLOTS) {
sdsfreesplitres(argv,argc);
goto fmterr;
}
p += 3;
cn = clusterLookupNode(p);
if (!cn) {
cn = createClusterNode(p,0);
clusterAddNode(cn);
}
if (direction == '>') {
server.cluster->migrating_slots_to[slot] = cn;
} else {
server.cluster->importing_slots_from[slot] = cn;
}
continue;
} else if ((p = strchr(argv[j],'-')) != NULL) {
*p = '\0';
start = atoi(argv[j]);
stop = atoi(p+1);
} else {
start = stop = atoi(argv[j]);
}
if (start < 0 || start >= CLUSTER_SLOTS ||
stop < 0 || stop >= CLUSTER_SLOTS)
{
sdsfreesplitres(argv,argc);
goto fmterr;
}
while(start <= stop) clusterAddSlot(n, start++);
}
sdsfreesplitres(argv,argc);
}
/* Config sanity check */
if (server.cluster->myself == NULL) goto fmterr;
zfree(line);
fclose(fp);
serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
/* Something that should never happen: currentEpoch smaller than
* the max epoch found in the nodes configuration. However we handle this
* as some form of protection against manual editing of critical files. */
if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
server.cluster->currentEpoch = clusterGetMaxEpoch();
}
return C_OK;
fmterr:
serverLog(LL_WARNING,
"Unrecoverable error: corrupted cluster config file.");
zfree(line);
if (fp) fclose(fp);
exit(1);
}
/* Cluster node configuration is exactly the same as CLUSTER NODES output.
*
* This function writes the node config and returns 0, on error -1
* is returned.
*
* Note: we need to write the file in an atomic way from the point of view
* of the POSIX filesystem semantics, so that if the server is stopped
* or crashes during the write, we'll end with either the old file or the
* new one. Since we have the full payload to write available we can use
* a single write to write the whole file. If the pre-existing file was
* bigger we pad our payload with newlines that are anyway ignored and truncate
* the file afterward. */
int clusterSaveConfig(int do_fsync) {
sds ci;
size_t content_size;
struct stat sb;
int fd;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
/* Get the nodes description and concatenate our "vars" directive to
* save currentEpoch and lastVoteEpoch. */
ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
(unsigned long long) server.cluster->currentEpoch,
(unsigned long long) server.cluster->lastVoteEpoch);
content_size = sdslen(ci);
if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
== -1) goto err;
/* Pad the new payload if the existing file length is greater. */
if (fstat(fd,&sb) != -1) {
if (sb.st_size > (off_t)content_size) {
ci = sdsgrowzero(ci,sb.st_size);
memset(ci+content_size,'\n',sb.st_size-content_size);
}
}
if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
if (do_fsync) {
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
if (fsync(fd) == -1) goto err;
}
/* Truncate the file if needed to remove the final \n padding that
* is just garbage. */
if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
/* ftruncate() failing is not a critical error. */
}
close(fd);
sdsfree(ci);
return 0;
err:
if (fd != -1) close(fd);
sdsfree(ci);
return -1;
}
void clusterSaveConfigOrDie(int do_fsync) {
if (clusterSaveConfig(do_fsync) == -1) {
serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
exit(1);
}
}
/* Lock the cluster config using flock(), and leaks the file descriptor used to
* acquire the lock so that the file will be locked forever.
*
* This works because we always update nodes.conf with a new version
* in-place, reopening the file, and writing to it in place (later adjusting
* the length with ftruncate()).
*
* On success C_OK is returned, otherwise an error is logged and
* the function returns C_ERR to signal a lock was not acquired. */
int clusterLockConfig(char *filename) {
/* flock() does not exist on Solaris
* and a fcntl-based solution won't help, as we constantly re-open that file,
* which will release _all_ locks anyway
*/
#if !defined(__sun)
/* To lock it, we need to open the file in a way it is created if
* it does not exist, otherwise there is a race condition with other
* processes. */
int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
if (fd == -1) {
serverLog(LL_WARNING,
"Can't open %s in order to acquire a lock: %s",
filename, strerror(errno));
return C_ERR;
}
if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
if (errno == EWOULDBLOCK) {
serverLog(LL_WARNING,
"Sorry, the cluster configuration file %s is already used "
"by a different Redis Cluster node. Please make sure that "
"different nodes use different cluster configuration "
"files.", filename);
} else {
serverLog(LL_WARNING,
"Impossible to lock %s: %s", filename, strerror(errno));
}
close(fd);
return C_ERR;
}
/* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
* lock to the file as long as the process exists.
*
* After fork, the child process will get the fd opened by the parent process,
* we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
* it will be closed in the child process.
* If it is not closed, when the main process is killed -9, but the child process
* (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
* child process, and the main process will fail to get lock, means fail to start. */
server.cluster_config_file_lock_fd = fd;
#else
UNUSED(filename);
#endif /* __sun */
return C_OK;
}
/* Derives our ports to be announced in the cluster bus. */
void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
int *announced_cport) {
int port = server.tls_cluster ? server.tls_port : server.port;
/* Default announced ports. */
*announced_port = port;
*announced_pport = server.tls_cluster ? server.port : 0;
*announced_cport = port + CLUSTER_PORT_INCR;
/* Config overriding announced ports. */
if (server.tls_cluster && server.cluster_announce_tls_port) {
*announced_port = server.cluster_announce_tls_port;
*announced_pport = server.cluster_announce_port;
} else if (server.cluster_announce_port) {
*announced_port = server.cluster_announce_port;
}
if (server.cluster_announce_bus_port) {
*announced_cport = server.cluster_announce_bus_port;
}
}
/* Some flags (currently just the NOFAILOVER flag) may need to be updated
* in the "myself" node based on the current configuration of the node,
* that may change at runtime via CONFIG SET. This function changes the
* set of flags in myself->flags accordingly. */
void clusterUpdateMyselfFlags(void) {
int oldflags = myself->flags;
int nofailover = server.cluster_slave_no_failover ?
CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
}
void clusterInit(void) {
int saveconf = 0;
server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = CLUSTER_FAIL;
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list =
dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0;
}
server.cluster->stats_pfail_nodes = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
/* Lock the cluster config file to make sure every node uses
* its own nodes.conf. */
server.cluster_config_file_lock_fd = -1;
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
exit(1);
/* Load or create a new nodes configuration. */
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
myself = server.cluster->myself =
createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1);
/* We need a listening TCP port for our cluster messaging needs. */
server.cfd.count = 0;
/* Port sanity check II
* The other handshake port check is triggered too late to stop
* us from trying to use a too-high cluster port number. */
int port = server.tls_cluster ? server.tls_port : server.port;
if (port > (65535-CLUSTER_PORT_INCR)) {
serverLog(LL_WARNING, "Redis port number too high. "
"Cluster communication port is 10,000 port "
"numbers higher than your Redis port. "
"Your Redis port number must be 55535 or less.");
exit(1);
}
if (listenToPort(port+CLUSTER_PORT_INCR, &server.cfd) == C_ERR) {
exit(1);
}
if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
}
/* The slots -> keys map is a radix tree. Initialize it here. */
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
server.cluster->mf_end = 0;
resetManualFailover();
clusterUpdateMyselfFlags();
}
边栏推荐
- 2022 Guangdong Provincial Safety Officer a certificate third batch (main person in charge) simulated examination and Guangdong Provincial Safety Officer a certificate third batch (main person in charg
- [Android reverse] use DB browser to view and modify SQLite database (download DB browser installation package | install DB browser tool)
- The 2022 global software R & D technology conference was released, and world-class masters such as Turing prize winners attended
- Yyds dry goods inventory [practical] simply encapsulate JS cycle with FP idea~
- [Happy Valentine's day] "I still like you very much, like sin ² a+cos ² A consistent "(white code in the attached table)
- 2.14 summary
- Les sociétés de valeurs mobilières dont la Commission d'ouverture d'un compte d'actions est la plus faible ont ce que tout le monde recommande.
- Comment obtenir une commission préférentielle pour l'ouverture d'un compte en bourse? Est - ce que l'ouverture d'un compte en ligne est sécurisée?
- C deep anatomy - the concept of keywords and variables # dry inventory #
- Apple released a supplementary update to MacOS Catalina 10.15.5, which mainly fixes security vulnerabilities
猜你喜欢

webAssembly
![[network security] what is emergency response? What indicators should you pay attention to in emergency response?](/img/ff/c733ffbb922760910ab09af3ae2886.jpg)
[network security] what is emergency response? What indicators should you pay attention to in emergency response?

Hcip 13th day notes

Blue Bridge Cup -- guess age

Interesting 10 CMD commands

2/14 (regular expression, sed streaming editor)

In 2022, 6G development has indeed warmed up

Exclusive download! Alibaba cloud native brings 10 + technical experts to bring "new possibilities of cloud native and cloud future"

Hcip day 16 notes

The reason why the computer runs slowly and how to solve it
随机推荐
Comparable interface and comparator interface
D26: the nearest number (translation + solution)
Pointer concept & character pointer & pointer array yyds dry inventory
Interpretation of corolla sub low configuration, three cylinder power configuration, CVT fuel saving and smooth, safety configuration is in place
Programming language (2)
NPM script
Ningde times and BYD have refuted rumors one after another. Why does someone always want to harm domestic brands?
Xiangong intelligent obtained hundreds of millions of yuan of b-round financing to accelerate the process of building non-standard solutions with standardized products
Op amp related - link
2/14 (regular expression, sed streaming editor)
The 2022 global software R & D technology conference was released, and world-class masters such as Turing prize winners attended
Hcip day 16 notes
Pyqt5 sensitive word detection tool production, operator's Gospel
Recursion and recursion
How about opening an account at Hengtai securities? Is it safe?
C3p0 connection MySQL 8.0.11 configuration problem
炒股開戶傭金優惠怎麼才能獲得,網上開戶安全嗎
Sword finger offer day 4 (Sword finger offer 03. duplicate numbers in the array, sword finger offer 53 - I. find the number I in the sorted array, and the missing numbers in sword finger offer 53 - ii
How to prevent malicious crawling of information by one-to-one live broadcast source server
Ningde times and BYD have refuted rumors one after another. Why does someone always want to harm domestic brands?