当前位置:网站首页>12. Nacos server service registration of source code analysis of Nacos service registration
12. Nacos server service registration of source code analysis of Nacos service registration
2022-07-03 11:02:00 【May your smile become a breeze】
nacos Service registration source code analysis nacos-server Service registration
primary coverage
Continue with the last blog post , This post mainly introduces nacos-server Server processing , Include the following :
nacos-serverHow to store service informationnacos-serverreceivenacos-clientProcessing of registration request , How to save service instancesnacos-serverreceivenacos-clientProcessing of heartbeat request
Service information storage structure

Storage structure core code
// ## ServiceManager
public class ServiceManager implements RecordListener<Service> {
/** * Map(namespace, Map(group::serviceName, Service)). * first floor :namespace * The second floor :group::serviceName Locate the Service */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}
// ## Service : The service stores clusterMap
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
// clusterName -> Cluster
private Map<String, Cluster> clusterMap = new HashMap<>();
}
// ## Cluster
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
// Store persistent nodes
private Set<Instance> persistentInstances = new HashSet<>();
// Storage temporary nodes
private Set<Instance> ephemeralInstances = new HashSet<>();
}
The server handles service registration InstanceController.register
Received service registration information

Service registration
// serviceManager.registerInstance(namespaceId, serviceName, instance);
// Sign up for a AP Mode of service
// If the service or cluster does not exist , Create
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// establish Service: namespaceId=public; [email protected]@system
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// Add service instance
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
establish Service
// createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// namespaceId=public; [email protected]@system; cluster=null
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
// The first registration is empty , Create Service object
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
// Save and initialize Service
// putServiceAndInit(service);
private void putServiceAndInit(Service service) throws NacosException {
// Put the service into serviceMap in , See below
putService(service);
// Service initialization : Start the client heartbeat detection task
service.init();
// consistencyService yes DelegateConsistencyServiceImpl
// Distributed consistency services , Register to listen , A lasting , A temporary
// Persistent monitoring key: com.alibaba.nacos.naming.iplist.ephemeral.public##[email protected]@system
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
// Temporary monitoring key: com.alibaba.nacos.naming.iplist.public##[email protected]@system
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
/** * Put service into manager. * @param service service */
public void putService(Service service) {
// doubleCheck The way , take Service Put in serviceMap in
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
// serviceMap: namespaceId -> Map(serviceName -> service )
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
Add service instance
// Add service instances to Service in
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// key: com.alibaba.nacos.naming.iplist.ephemeral.public##[email protected]@system
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// Construct an instance list
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// Save to consistencyService in
consistencyService.put(key, instances);
}
}
The server receives the client heartbeat report InstanceController.beat
Controller Receive heartbeat request
// It's simpler here
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// clientBeatInterval: 5000
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
// beat = "";
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
// DEFAULT
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
// 192.168.31.30
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
// 8081
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
// false
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
// public
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// [email protected]@system
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// Get instance information
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// If the instance information is empty , Then re register to the Registration Center
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// ### Officially handle heartbeat , See below
service.processClientBeat(clientBeat);
// Process the returned information
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
Heartbeat processing service.processClientBeat(clientBeat)
// processClientBeat Construct a ClientBeatProcessor To handle heartbeat requests
public void processClientBeat(final RsInfo rsInfo) {
// Client heartbeat processor
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
// Execution processor
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
// ClientBeatProcessor.run
@Override
public void run() {
Service service = this.service;
String ip = rsInfo.getIp(); // 192.168.31.30
String clusterName = rsInfo.getCluster(); // DEFAULT
int port = rsInfo.getPort(); // 8081
// Get the cluster information under the service
Cluster cluster = service.getClusterMap().get(clusterName);
// Get all instances under the cluster
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
// Traversing instances
// If it is an instance of reporting heartbeat
if (instance.getIp().equals(ip) && instance.getPort() == port) {
// Update the last heartbeat upload time
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
// If the instance is unhealthy , Then update the health status
instance.setHealthy(true);
// Expose service information change events
getPushService().serviceChanged(service);
}
}
}
}
}
边栏推荐
- T5 attempt
- C language project: student achievement system
- QT:QSS自定义 QMenuBar实例
- 嵌入式软件测试怎么实现自动化测试?
- QT: QSS custom qtoolbar and qtoolbox instances
- QT:QSS自定义 QTabWidget 和 QTabBar实例
- Chiyou (), a specific mythical image, is also gradually abstracted as a dramatic character type "Jing". "Jing", born in Dan Dynasty and ugly at the end, is the earliest "profession" in Chinese drama
- 软件测试必学基本理论知识——APP测试
- Logstash backup tracks the data records reported
- 【蓝桥杯选拔赛真题44】Scratch消灭骷髅军团 少儿编程scratch蓝桥杯选拔赛真题讲解
猜你喜欢

多路IO转接——前导

Error installing the specified version of pilot

嵌入式軟件測試怎麼實現自動化測試?

Bidding website architecture project progress -- Network Security

Large scale e-commerce project - environment construction

What is the salary level of 17k? Let's take a look at the whole interview process of post-95 Test Engineers

现在零基础转行软件测试还OK吗?

Basic theoretical knowledge of software testing -- app testing

测试理论概述

独家分析 | 关于简历和面试的真 相
随机推荐
Qt:qss custom qradiobutton instance
游戏测试相关 测试一个英雄的技能(春招被问比较多的一道题)
Wechat applet training 2
[true question of the Blue Bridge Cup trials 44] scratch eliminate the skeleton Legion children programming explanation of the true question of the Blue Bridge Cup trials
《通信软件开发与应用》
Flink chain conditional source code analysis
What happened to those who focused on automated testing?
Extern keyword
字节跳动大裁员,测试工程师差点遭团灭:大厂招人背后的套路,有多可怕?
年中了,准备了少量的自动化面试题,欢迎来自测
File upload and download test point
公司测试部门来了个00后卷王之王,老油条感叹真干不过,但是...
Win10系统下提示“系统组策略禁止安装此设备”的解决方案(家庭版无组策略)
MAUI Developer Day in GCR
Nuget add reference error while installing packages
月薪过万的测试员,是一种什么样的生活状态?
logstash备份跟踪上报的数据记录
Software testing (test case) writing: vulgar, native and skillful
公司里只有一个测试是什么体验?听听他们怎么说吧
Qt:qss custom qlistview instance