当前位置:网站首页>Nacos source code analysis topic (2) - service registration
Nacos source code analysis topic (2) - service registration
2022-08-02 02:46:00 【IT-Lao Niu】
1.引言
服务注册到Nacos
以后,会保存在一个本地注册表
中,其结构如下:
首先最外层是一个Map
,结构为:Map<String, Map<String, Service>>
:
key
:是namespace_id
,起到环境隔离的作用.namespace
下可以有多个group
value
:又是一个Map<String, Service>
,代表分组及组内的服务.一个组内可以有多个服务key
:代表group
分组,不过作为key
时格式是group_name:service_name
value
:分组下的某一个服务,例如userservice
,用户服务.类型为Service
,内部也包含一个Map<String,Cluster>
,一个服务下可以有多个集群key
:集群名称value
:Cluster
类型,包含集群的具体信息.一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set<Instance>
,就是该集群下的实例的集合
-Instance
:实例信息,包含实例的IP
、Port
、健康状态、权重等等信息
每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中.
2.服务注册接口
Nacos
提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册.
接口说明:注册一个实例到Nacos服务.
请求类型:POST
请求路径:/nacos/v1/ns/instance
请求参数:
名称 | 类型 | 是否必选 | 描述 |
---|---|---|---|
ip | 字符串 | 是 | 服务实例IP |
port | int | 是 | 服务实例port |
namespaceId | 字符串 | 否 | 命名空间ID |
weight | double | 否 | 权重 |
enabled | boolean | 否 | 是否上线 |
healthy | boolean | 否 | 是否健康 |
metadata | 字符串 | 否 | 扩展信息 |
clusterName | 字符串 | 否 | 集群名 |
serviceName | 字符串 | 是 | 服务名 |
groupName | 字符串 | 否 | 分组名 |
ephemeral | boolean | 否 | 是否临时实例 |
错误编码:
错误代码 | 描述 | 语义 |
---|---|---|
400 | Bad Request | 客户端请求中的语法错误 |
403 | Forbidden | 没有权限 |
404 | Not Found | 无法找到资源 |
500 | Internal Server Error | 服务器内部错误 |
200 | OK | 正常 |
3.客户端
首先,我们需要找到服务注册的入口.
3.1.NacosServiceRegistryAutoConfiguration
因为Nacos
的客户端是基于SpringBoot
的自动装配实现的,我们可以在nacos-discovery
依赖:
spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar
这个包中找到Nacos
自动装配信息:
可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration
这个类,我们跟入其中.
可以看到,在NacosServiceRegistryAutoConfiguration
这个类中,包含一个跟自动注册有关的Bean:
3.2.NacosAutoServiceRegistration
NacosAutoServiceRegistration
源码如图:
可以看到在初始化时,其父类AbstractAutoServiceRegistration
也被初始化了.
AbstractAutoServiceRegistration
如图:
可以看到它实现了ApplicationListener
接口,监听Spring容器启动过程中的事件.
在监听到WebServerInitializedEvent
(web服务初始化完成)的事件后,执行了bind
方法.
其中的bind方法如下:
public void bind(WebServerInitializedEvent event) {
// 获取 ApplicationContext
ApplicationContext context = event.getApplicationContext();
// 判断服务的 namespace,一般都是null
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
// 记录当前 web 服务的端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 启动当前服务注册流程
this.start();
}
其中的start
方法流程:
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// 当前服务处于未运行状态时,才进行初始化
if (!this.running.get()) {
// 发布服务开始注册的事件
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
// ****开始注册****
register();
if (shouldRegisterManagement()) {
registerManagement();
}
// 发布注册完成事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
// 服务状态设置为运行状态,基于AtomicBoolean
this.running.compareAndSet(false, true);
}
}
其中最关键的register()
方法就是完成服务注册的关键,代码如下:
protected void register() {
this.serviceRegistry.register(getRegistration());
}
此处的this.serviceRegistry
就是NacosServiceRegistry
:
3.3.NacosServiceRegistry
NacosServiceRegistry
是Spring的ServiceRegistry
接口的实现类,而ServiceRegistry
接口是服务注册、发现的规约接口,定义了register
、deregister
等方法的声明.
而NacosServiceRegistry
对register
的实现如下:
@Override
public void register(Registration registration) {
// 判断serviceId是否为空,也就是spring.application.name不能为空
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 获取Nacos的命名服务,其实就是注册中心服务
NamingService namingService = namingService();
// 获取 serviceId 和 Group
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
// 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 开始注册服务
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
可以看到方法中最终是调用NamingService
的registerInstance
方法实现注册的.
而NamingService
接口的默认实现就是NacosNamingService
.
3.4.NacosNamingService
NacosNamingService
提供了服务注册、订阅等功能.
其中registerInstance
就是注册服务实例,源码如下:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 检查超时参数是否异常.心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
NamingUtils.checkInstanceIsLegal(instance);
// 拼接得到新的服务名,格式为:[email protected]@serviceId
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否为临时实例,默认为 true.
if (instance.isEphemeral()) {
// 如果是临时实例,需要定时向 Nacos 服务发送心跳
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 发送注册服务实例的请求
serverProxy.registerService(groupedServiceName, groupName, instance);
}
最终,由NacosProxy
的registerService
方法,完成服务注册.
代码如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 组织请求参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
这里提交的信息就是Nacos
服务注册接口需要的完整参数,核心参数有:
- namespace_id:环境
- service_name:服务名称
- group_name:组名称
- cluster_name:集群名称
- ip: 当前实例的ip地址
- port: 当前实例的端口
而在NacosNamingService
的registerInstance
方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习.
4.客户端注册的流程图
如图:
5.服务端
在nacos-console
的模块中,会引入nacos-naming
这个模块
模块结构如下:
其中的com.alibaba.nacos.naming.controllers
包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController
类中:
5.1.InstanceController
进入InstanceController
类,可以看到一个register
方法,就是服务注册的方法了:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 尝试获取namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 尝试获取serviceName,其格式为 [email protected]@service_name
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 解析出实例信息,封装为Instance对象
final Instance instance = parseInstance(request);
// 注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
这里,进入到了serviceManager.registerInstance()
方法中.
5.2.ServiceManager
ServiceManager
就是Nacos
中管理服务、实例信息的核心API,其中就包含Nacos
的服务注册表:
而其中的registerInstance
方法就是注册服务实例的方法:
/** * Register an instance to a service in AP mode. * * <p>This method creates service or cluster silently if they don't exist. * * @param namespaceId id of namespace * @param serviceName service name * @param instance instance to register * @throws Exception any error occurred in the process */
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
// 此时不包含实例信息
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 拿到创建好的service
Service service = getService(namespaceId, serviceName);
// 拿不到则抛异常
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加要注册的实例到service中
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
创建好了服务,接下来就要添加实例到服务中:
/** * Add instance to service. * * @param namespaceId namespace * @param serviceName service name * @param ephemeral whether instance is ephemeral * @param ips instances * @throws NacosException nacos exception */
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##[email protected]@order-service
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取服务
Service service = getService(namespaceId, serviceName);
// 同步锁,避免并发修改的安全问题
synchronized (service) {
// 1)获取要更新的实例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 2)封装实例列表到Instances对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 3)完成 注册表更新 以及 Nacos集群的数据同步
consistencyService.put(key, instances);
}
}
该方法中对修改服务列表的动作加锁处理,确保线程安全.而在同步代码块中,包含下面几步:
1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);
2)然后将更新后的数据封装到Instances
对象中,后面更新注册表时使用
3)最后,调用consistencyService.put()
方法完成Nacos
集群的数据同步,保证集群一致性.
注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中.在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表.而在更新过程中,旧实例列表不受影响,用户依然可以读取.
这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好.这种方案称为CopyOnWrite方案.
5.2.1.更服务列表
我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);
:
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
继续进入updateIpAddresses
方法:
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum
// 第一次来,肯定是null
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
// 得到服务中现有的实例列表
List<Instance> currentIPs = service.allIPs(ephemeral);
// 创建map,保存实例列表,key为ip地址,value是Instance对象
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
// 创建Set集合,保存实例的instanceId
Set<String> currentInstanceIds = Sets.newHashSet();
// 遍历要现有的实例列表
for (Instance instance : currentIPs) {
// 添加到map中
currentInstances.put(instance.toIpAddr(), instance);
// 添加instanceId到set中
currentInstanceIds.add(instance.getInstanceId());
}
// 创建map,用来保存更新后的实例列表
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
// 如果服务中已经有旧的数据,则先保存旧的实例列表
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
// 如果没有旧数据,则直接创建新的map
instanceMap = new HashMap<>(ips.length);
}
// 遍历实例列表
for (Instance instance : ips) {
// 判断服务中是否包含要注册的实例的cluster信息
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
// 如果不包含,创建新的cluster
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
// 将集群放入service的注册表
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
// 删除实例 or 新增实例 ?
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
// 新增实例,instance生成全新的instanceId
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
// 放入instance列表
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
// 将instanceMap中的所有实例转为List返回
return new ArrayList<>(instanceMap.values());
}
简单来讲,就是先获取旧的实例列表,然后把新的实例信息与旧的做对比,新的实例就添加,老的实例同步ID.然后返回最新的实例列表.
5.2.2.Nacos集群一致性
在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:
consistencyService.put(key, instances);
这里的ConsistencyService
接口,代表集群一致性的接口,有很多中不同实现:
我们进入DelegateConsistencyServiceImpl
来看:
@Override
public void put(String key, Record value) throws NacosException {
// 根据实例是否是临时实例,判断委托对象
mapConsistencyService(key).put(key, value);
}
其中的mapConsistencyService(key)
方法就是选择委托方式的:
private ConsistencyService mapConsistencyService(String key) {
// 判断是否是临时实例:
// 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
// 否,选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
默认情况下,所有实例都是临时实例,我们关注DistroConsistencyServiceImpl
即可.
5.2.3.DistroConsistencyServiceImpl
我们来看临时实例的一致性实现:DistroConsistencyServiceImpl
类的put
方法:
public void put(String key, Record value) throws NacosException {
// 先将要更新的实例信息写入本地实例列表
onPut(key, value);
// 开始集群同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
这里方法只有两行:
onPut(key, value)
:其中value就是Instances,要更新的服务信息.这行主要是基于线程池方式,异步的将Service信息写入注册表中(就是那个多重Map)distroProtocol.sync()
:就是通过Distro
协议将数据同步给集群中的其它Nacos
节点
我们先看onPut
方法
5.2.4.更新本地实例列表
1)放入阻塞队列
onPut
方法如下:
public void onPut(String key, Record value) {
// 判断是否是临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 封装 Instances 信息到 数据集:Datum
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 放入DataStore
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 放入阻塞队列,这里的 notifier维护了一个阻塞队列,并且基于线程池异步执行队列中的任务
notifier.addTask(key, DataOperation.CHANGE);
}
notifier
的类型就是DistroConsistencyServiceImpl
.Notifier
,内部维护了一个阻塞队列,存放服务列表变更的事件:
addTask
时,将任务加入该阻塞队列:
// DistroConsistencyServiceImpl.Notifier类的 addTask 方法:
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 任务放入阻塞队列
tasks.offer(Pair.with(datumKey, action));
}
2)Notifier异步更新
同时,notifier
还是一个Runnable
,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新.来看下其中的run方法:
// DistroConsistencyServiceImpl.Notifier类的run方法:
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
// 死循环,不断执行任务.因为是阻塞队列,不会导致CPU负载过高
for (; ; ) {
try {
// 从阻塞队列中获取任务
Pair<String, DataOperation> pair = tasks.take();
// 处理任务,更新服务列表
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
来看看handle方法:
// DistroConsistencyServiceImpl.Notifier类的 handle 方法:
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
// 遍历,找到变化的service,这里的 RecordListener就是 Service
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
// 服务的实例列表CHANGE事件
if (action == DataOperation.CHANGE) {
// 更新服务列表
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
// 服务的实例列表 DELETE 事件
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
3)覆盖实例列表
而在Service
的onChange
方法中,就可以看到更新实例列表的逻辑了:
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
// 更新实例列表
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
updateIPs方法:
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 准备一个Map,key是cluster,值是集群下的Instance集合
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 获取服务的所有cluster名称
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
// 遍历要更新的实例
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 判断实例是否包含clusterName,没有的话用默认cluster
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 判断cluster是否存在,不存在则创建新的cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 获取当前cluster实例的集合,不存在则创建新的
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 添加新的实例到 Instance 集合
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 将实例集合更新到 clusterMap(注册表)
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// 发布服务变更的通知消息
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
在第45行的代码中:clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
就是在更新注册表:
public void updateIps(List<Instance> ips, boolean ephemeral) {
// 获取旧实例列表
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
// 检查新加入实例的状态
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
// 移除要删除的实例
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
// 直接覆盖旧实例列表
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
5.2.5.集群数据同步
在DistroConsistencyServiceImpl
的put方法中分为两步:
其中的onPut
方法已经分析过了.
下面的distroProtocol.sync()
就是集群同步的逻辑了.
DistroProtocol
类的sync
方法如下:
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 遍历 Nacos 集群中除自己以外的其它节点
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 定义一个Distro的同步任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 交给线程池去执行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
其中同步的任务封装为一个DistroDelayTask
对象.
交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()
执行,这行代码的返回值是:
NacosDelayTaskExecuteEngine
,这个类维护了一个线程池,并且接收任务,执行任务.
执行任务的方法为processTasks()
方法:
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// 尝试执行同步任务,如果失败会重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
可以看出来基于Distro
模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略.
5.3.服务端流程图
6.总结
6.1.Nacos的注册表结构是什么样的?
答:Nacos
是多级存储模型,最外层通过namespace来实现环境隔离,然后是group
分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例.因此其注册表结构为一个Map,类型是:
Map<String, Map<String, Service>>
,
外层key
是namespace_id
,内层key
是group+serviceName
.
Service
内部维护一个Map
,结构是:Map<String,Cluster>
,key
是clusterName
,值是集群信息
Cluster
内部维护一个Set集合,元素是Instance
类型,代表集群中的多个实例.
6.2.Nacos如何保证并发写的安全性?
答:首先,在注册实例时,会对service
加锁,不同service之间本身就不存在并发写问题,互不影响.相同service
时通过锁来互斥.并且,在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1.
6.3.Nacos如何避免并发读写的冲突?
答:Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表.
6.4.Nacos如何应对阿里内部数十万服务的并发写请求?
答:Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力.
边栏推荐
- 60 Feature Engineering Operations: Using Custom Aggregate Functions【Favorites】
- 1688API
- 工程师如何对待开源
- GTK RGB图像绘制
- AcWing 1053. 修复DNA 题解(状态机DP、AC自动机)
- PHP live source code to achieve simple barrage effect related code
- How ReentrantLock works
- Docker-compose安装mysql
- analog IC layout-Design for reliability
- mysql 查看死锁
猜你喜欢
四元数、罗德里格斯公式、欧拉角、旋转矩阵推导和资料
【每日一道LeetCode】——1. 两数之和
GTK RGB图像绘制
51. 数字排列
BI-SQL丨WHILE
接口测试神器Apifox究竟有多香?
Reflex WMS Intermediate Series 7: What should I do if I want to cancel the picking of an HD that has finished picking but has not yet been loaded?
Safety (2)
analog IC layout-Parasitic effects
The principle and code implementation of intelligent follower robot in the actual combat of innovative projects
随机推荐
2022 NPDP take an examination of how the results?How to query?
VPS8701 电源管理(PMIC) VPS8701
字典常用方法
GTK RGB图像绘制
详解最强分布式锁工具:Redisson
PAT甲级打卡-1001-1004
cocos中使用async await异步加载资源
analog IC layout
Flask入门学习教程
Nanoprobes Polyhistidine (His-) Tag: Recombinant Protein Detection Protocol
非稳压 源特电子 隔离电源模块芯片 5W VPS8504B 24V
C#测试项目中属性的用法
微服务:微智能在软件系统的简述
IMU预积分的简单理解
Qt自定义控件和模板分享
递归检查配置项是否更变并替换
The principle and code implementation of intelligent follower robot in the actual combat of innovative projects
MySQL索引优化实战
ReentrantLock工作原理
analog IC layout-Parasitic effects