当前位置:网站首页>因为一次bug的教训,我决定手撕Nacos源码(先撕客户端源码)
因为一次bug的教训,我决定手撕Nacos源码(先撕客户端源码)
2022-08-04 00:05:00 【博学谷狂野架构师】
Nacos源码剖析
Nacos源码有很多值得我们学习的地方,为了深入理解Nacos,我们剖析源码,分析如下2个知识点:
1:Nacos对注册中心的访问原理
2:Nacos注册服务处理流程
源码环境搭建
1、从官方项目上克隆下来,并且检出 1.4.1 版本,导入idea。nacos源码环境搭建起来比较轻松,几乎不会报什么错误,导入后编译安装到本地环境即可
2、找到config
模块中找到 \resources\META-INF\nacos-db.sql
,在本地mysql中创建数据库nacos-config
,将该脚本导入执行创建表。
3、找到console
模块下的配置文件application.properties
,修改相关配置
#*************** Config Module Related Configurations ***************#
### If use MySQL as datasource:
spring.datasource.platform=mysql
### Count of DB:
db.num=1
### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=nacos
db.password.0=nacos
4、找到console
模块下的启动类,启动nacos的服务端,启动时添加启动参数,指定启动模式为非集群启动
-Dnacos.standalone=true
5、访问本地的nacos:http://localhost:8848/nacos
至此,源码环境搭建成功!
nacos客户端
首先要搞清楚:nacos的客户端其实在我们自己的服务里,我们引入了nacos的相关坐标依赖,nacos客户端以jar包的形式在我们的服务中工作
对于nacos的客户端,它都要帮我们实现什么功能呢?
它的主要工作有:服务注册、服务发现、服务下线操作、服务订阅操作等相关操作。
客户端与注册中心服务端的交互,主要集中在服务注册、服务下线、服务发现、订阅某个服务,其实使用最多的就是服务注册和服务发现,下面我会从源码的角度分析一下这四个功能。
在Nacos源码中nacos-example
中com.alibaba.nacos.example.NamingExample
类分别演示了这4个功能的操作,我们可以把它当做入口,代码如下:
public class NamingExample {
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr"));
properties.setProperty("namespace", System.getProperty("namespace"));
NamingService naming = NamingFactory.createNamingService(properties);
//服务注册
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
//服务发现
System.out.println(naming.getAllInstances("nacos.test.3"));
//服务下线
naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
System.out.println(naming.getAllInstances("nacos.test.3"));
//服务订阅
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-thread");
return thread;
}
});
naming.subscribe("nacos.test.3", new AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
}
});
}
}
1.1.1 服务注册
我们沿着案例中的服务注册方法调用找到nacos-api
中的NamingService.registerInstance()
并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService
,代码如下:
/*** * 服务注册 * @param serviceName 服务名字 * @param ip 服务IP * @param port 服务端口 * @param clusterName 集群名字 * @throws NacosException */
@Override
public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
}
@Override
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
//设置实例IP:Port,默认权重为1.0
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
//注册实例
registerInstance(serviceName, groupName, instance);
}
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}
/*** * 实例注册 * @param serviceName name of service * @param groupName group of service * @param instance instance to register * @throws NacosException */
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//该字段表示注册的实例是否是临时实例还是持久化实例。
// 如果是临时实例,则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,
// 如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。
if (instance.isEphemeral()) {
//为注册服务设置一个定时任务获取心跳信息,默认为5s汇报一次
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//注册到服务端
serverProxy.registerService(groupedServiceName, groupName, instance);
}
注册主要做了两件事,第一件事:为注册的服务设置一个定时心跳任务。 第二件事:将服务注册到服务端。
1:启动一个定时心跳任务,时间间隔为5s,如果服务正常,不做处理,如果不正常,重新注册
2:发送http请求给注册中心服务端,调用服务注册接口,注册服务
上面代码我们可以看到定时任务添加,但并未完全看到远程请求,serverProxy.registerService()
方法如下,会先封装请求参数,接下来调用reqApi()
而reqApi()
最后会调用callServer()
,代码如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
//封装Http请求参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
//执行Http请求
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
/*** *执行远程调用 **/
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
//封装请求头部
Header header = builderHeader();
//请求是Http还是Https协议
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
//执行远程请求,并获取结果集
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
//结果集解析
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
执行远程Http请求的对象是NacosRestTemplate
,该对象就是封装了普通的Http请求,大家可以自己查阅一下。
1.1.2 服务发现
两个入口:
1、NamingService.getAllInstances(serviceName)
我们沿着案例中的服务发现方法调用找到nacos-api
中的NamingService.getAllInstances()
并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService.getAllInstances()
,代码如下:
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
/*默认true->获取服务实例*/
if (subscribe) {
//从本地缓存中获取,如果本地缓存不存在从服务端拉取
//本地缓存会存储在HostReactor.serviceInfoMap中,它是一个Map对象
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
上面的代码调用了hostReactor.getServiceInfo()
方法,该方法会先调用getServiceInfo0()
方法从本地缓存获取数据,缓存没有数据,就构建实例更新到Nacos,并从Nacos中获取最新数据,getServiceInfo0()
方法源码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
/*1。先从本地缓存中获取服务对象,因为启动是第一次进来,所以缓存暂不存在*/
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
/*构建服务实例*/
serviceObj = new ServiceInfo(serviceName, clusters);
/*将服务实例存放到缓存中*/
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
/*更新nacos-上的服务*/
updatingMap.put(serviceName, new Object());
/*主动获取,并且更新到缓存本地,以及已过期的服务更新等*/
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
/*2.开启定时任务*/
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
updateServiceNow(serviceName, clusters);
主从从远程服务器获取更新数据,最终会调用updateService()
方法,在该方法中完成远程请求和数据处理,源码如下:
public void updateService(String serviceName, String clusters) throws NacosException {
/*获取本地缓存列表中所存在的服务*/
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
/*获取服务以及提供者端口信息,端口等*/
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
/*对结果进行处理*/
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
1.1.3 服务下线
我们沿着案例中的服务下线方法调用找到nacos-api
中的NamingService.deregisterInstance()
并找到它的实现类和方法NacosNamingService.deregisterInstance()
,代码如下:
@Override
public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
//构建实例信息
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
//服务下线操作
deregisterInstance(serviceName, groupName, instance);
}
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
//移除心跳信息监测的定时任务
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
instance.getPort());
}
//发送远程请求执行服务下线销毁操作
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}
服务下线方法比较简单,和服务注册做的事情正好相反,也做了两件事,第一件事:不在进行心跳检测。 第二件事:请求服务端服务下线接口。
1.1.4 服务订阅
我们可以查看订阅服务的案例,会先创建一个线程池,接下来会把线程池封装到监听器中,而监听器中可以监听指定实例信息,代码如下:
//服务订阅
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-thread");
return thread;
}
});
naming.subscribe("nacos.test.3", new AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}
//读取监听到的服务实例
@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
}
});
我们沿着案例中的服务订阅方法调用找到nacos-api
中的NamingService.subscribe()
并找到它的实现类和方法NacosNamingService.deregisterInstance()
,代码如下:
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
//注册监听
notifier.registerListener(serviceName, clusters, eventListener);
//获取并更新服务实例
getServiceInfo(serviceName, clusters);
}
此时会注册监听,注册监听,注册监听就是将当前的监听对象信息注入到listenerMap集合中,在监听对象的指定方法onEvent中可以读取实例信息,代码如下:
public void registerListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
//将当前监听对象放入到集合中,在监听对象的onEvent中可以读出对应的实例对象
eventListeners.add(listener);
}
本文由传智教育博学谷 - 狂野架构师教研团队发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!
边栏推荐
- C语言实验十五 文件
- 搭建好pytorch环境后,pip和conda指令不能用
- R3LIVE论文学习(二):VIO子系统
- 详谈RDMA技术原理和三种实现方式
- 【超详细教程】LVS+KeepAlived高可用部署实战应用
- ros mavros stereo读取rosbag并记录IMU和图片到文件夹
- 一文参透分布式存储系统Ceph的架构设计、集群搭建(手把手)
- The Chinese Valentine's Day event is romantically launched, don't let the Internet slow down and miss the dark time
- Node.js的基本使用(三)数据库与身份认证
- 超级完美版布局有快捷键,有背景置换
猜你喜欢
[Miscellaneous] How to install the specified font into the computer and then use the font in the Office software?
Go编译原理系列7(Go源码调试)
我的祖国
View the version number of CUDA, pytorch, etc.
Using matlab to solve the linear optimization problem based on matlab dynamic model of learning notes _11 】 【
Read FastDFS in one article
Three.js入门详解
C# wpf使用ffmpeg命令行实现录屏
绕任意轴旋转矩阵推导
win10+cuda11.7+pytorch1.12.0 installation
随机推荐
【杂项】如何将指定字体装入电脑然后能在Office软件里使用该字体?
DataBinding下的RecycleView适配器Adapter基类
LYVE1抗体丨Relia Tech LYVE1抗体解决方案
MPLS综合实验
七夕活动浪漫上线,别让网络拖慢和小姐姐的开黑时间
MPLS Comprehensive Experiment
Nanoprobes丨Nanogold-抗体和链霉亲和素偶联物
C语言实验十四 结构体
Internship: Upload method for writing excel sheet (import)
ENS域名注册量创历史新高 逆市增长之势?光环之下存在炒作风险
带你造轮子,自定义一个随意拖拽可吸边的悬浮View组件
HNUCM 您好中国
internship:编写excel表的上传方法(导入)
jav一键生成数据库文档
The longest substring that cannot have repeating characters in a leetcode/substring
LeetCode 19:删除链表的倒数第 N 个结点
Unity intercepts 3D images and the implementation of picture-in-picture PIP
直播系统聊天技术(八):vivo直播系统中IM消息模块的架构实践
2022-08-03:以下go语言代码输出什么?A:2;B:3;C:1;D:0。 package main import “fmt“ func main() { slice := []i
【杂项】通过Excel为字符串产生条码