当前位置:网站首页>12. Nacos server service registration of source code analysis of Nacos service registration
12. Nacos server service registration of source code analysis of Nacos service registration
2022-07-03 11:02:00 【May your smile become a breeze】
nacos Service registration source code analysis nacos-server Service registration
primary coverage
Continue with the last blog post , This post mainly introduces nacos-server Server processing , Include the following :
nacos-server
How to store service informationnacos-server
receivenacos-client
Processing of registration request , How to save service instancesnacos-server
receivenacos-client
Processing of heartbeat request
Service information storage structure
Storage structure core code
// ## ServiceManager
public class ServiceManager implements RecordListener<Service> {
/** * Map(namespace, Map(group::serviceName, Service)). * first floor :namespace * The second floor :group::serviceName Locate the Service */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}
// ## Service : The service stores clusterMap
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
// clusterName -> Cluster
private Map<String, Cluster> clusterMap = new HashMap<>();
}
// ## Cluster
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
// Store persistent nodes
private Set<Instance> persistentInstances = new HashSet<>();
// Storage temporary nodes
private Set<Instance> ephemeralInstances = new HashSet<>();
}
The server handles service registration InstanceController.register
Received service registration information
Service registration
// serviceManager.registerInstance(namespaceId, serviceName, instance);
// Sign up for a AP Mode of service
// If the service or cluster does not exist , Create
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// establish Service: namespaceId=public; [email protected]@system
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// Add service instance
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
establish Service
// createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// namespaceId=public; [email protected]@system; cluster=null
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
// The first registration is empty , Create Service object
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
// Save and initialize Service
// putServiceAndInit(service);
private void putServiceAndInit(Service service) throws NacosException {
// Put the service into serviceMap in , See below
putService(service);
// Service initialization : Start the client heartbeat detection task
service.init();
// consistencyService yes DelegateConsistencyServiceImpl
// Distributed consistency services , Register to listen , A lasting , A temporary
// Persistent monitoring key: com.alibaba.nacos.naming.iplist.ephemeral.public##[email protected]@system
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
// Temporary monitoring key: com.alibaba.nacos.naming.iplist.public##[email protected]@system
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
/** * Put service into manager. * @param service service */
public void putService(Service service) {
// doubleCheck The way , take Service Put in serviceMap in
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
// serviceMap: namespaceId -> Map(serviceName -> service )
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
Add service instance
// Add service instances to Service in
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// key: com.alibaba.nacos.naming.iplist.ephemeral.public##[email protected]@system
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// Construct an instance list
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// Save to consistencyService in
consistencyService.put(key, instances);
}
}
The server receives the client heartbeat report InstanceController.beat
Controller Receive heartbeat request
// It's simpler here
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// clientBeatInterval: 5000
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
// beat = "";
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
// DEFAULT
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
// 192.168.31.30
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
// 8081
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
// false
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
// public
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// [email protected]@system
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// Get instance information
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// If the instance information is empty , Then re register to the Registration Center
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// ### Officially handle heartbeat , See below
service.processClientBeat(clientBeat);
// Process the returned information
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
Heartbeat processing service.processClientBeat(clientBeat)
// processClientBeat Construct a ClientBeatProcessor To handle heartbeat requests
public void processClientBeat(final RsInfo rsInfo) {
// Client heartbeat processor
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
// Execution processor
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
// ClientBeatProcessor.run
@Override
public void run() {
Service service = this.service;
String ip = rsInfo.getIp(); // 192.168.31.30
String clusterName = rsInfo.getCluster(); // DEFAULT
int port = rsInfo.getPort(); // 8081
// Get the cluster information under the service
Cluster cluster = service.getClusterMap().get(clusterName);
// Get all instances under the cluster
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
// Traversing instances
// If it is an instance of reporting heartbeat
if (instance.getIp().equals(ip) && instance.getPort() == port) {
// Update the last heartbeat upload time
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
// If the instance is unhealthy , Then update the health status
instance.setHealthy(true);
// Expose service information change events
getPushService().serviceChanged(service);
}
}
}
}
}
边栏推荐
- In the middle of the year, I have prepared a small number of automated interview questions. Welcome to the self-test
- 如何监测服务器主机的进出流量?
- MAUI Developer Day in GCR
- Game test related tests a hero's skills (spring moves are asked more questions)
- Some abilities can't be learned from work. Look at this article, more than 90% of peers
- Solution: jupyter notebook does not pop up the default browser
- Cache routing component
- 年中了,准备了少量的自动化面试题,欢迎来自测
- Qt:qss custom qscrollbar instance
- MySQL checks for automatic updates at 0:00 every day
猜你喜欢
硬 货 | 一改测试步骤代码就全写?为什么不试试用 Yaml实现数据驱动?
UI自动化测试如何走出困境?价值又如何体现?
Bidding website architecture project progress -- Network Security
Use ml.net+onnx pre training model to liven the classic "Huaqiang buys melons" in station B
MySQL checks for automatic updates at 0:00 every day
17K薪资要什么水平?来看看95后测试工程师的面试全过程…
有些能力,是工作中学不来的,看看这篇超过90%同行
那些一门心思研究自动化测试的人,后来怎样了?
Day 7 small exercise
【蓝桥杯选拔赛真题44】Scratch消灭骷髅军团 少儿编程scratch蓝桥杯选拔赛真题讲解
随机推荐
QT:QSS自定义QTableView实例
Small file special
Praise syllogism
The solution that prompts "system group policy prohibits the installation of this device" under win10 system (home version has no group policy)
Uni app learning 1 bottom menu and parent-child components
QT:QSS自定义 QStatusBar实例
Cause: org. apache. ibatis. builder. Builderexception: error parsing SQL mapper configuration problem analysis
Flink < --> Introduction to JDBC +with parameter
Basic usage of sqlmap
Bid -- service commitment -- self summary
《通信软件开发与应用》
logstash备份跟踪上报的数据记录
Nuget add reference error while installing packages
Que se passe - t - il ensuite pour ceux qui se sont concentrés sur les tests automatisés?
UI自动化测试如何走出困境?价值又如何体现?
Word line and bit line
公司里只有一个测试是什么体验?听听他们怎么说吧
. Net core - a queuing system for wechat official account
如何监测服务器主机的进出流量?
Wechat applet training notes 1