当前位置:网站首页>手撕Nacos源码,今日撕服务端源码
手撕Nacos源码,今日撕服务端源码
2022-08-04 00:05:00 【博学谷狂野架构师】
紧接上文,我们分析了Nacos的客户端代码,
手撕Nacos源码(先撕客户端源码)
今天我们再来试一下服务端 ,至此就可以Nacos源码就告一段落,欢迎大家品鉴。
nacos服务端
注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳、选主等操作。
我们来学习一下注册中心服务端接收客户端服务注册的功能。
注册处理
我们先来学习一下Nacos的工具类WebUtils
,该工具类在nacos-core
工程下,该工具类是用于处理请求参数转化的,里面提供了2个常被用到的方法required()
和optional()
:
required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。
optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。
代码如下:
/** * required方法通过参数名key,解析HttpServletRequest请求中的参数,并转码为UTF-8编码。 */
public static String required(final HttpServletRequest req, final String key) {
String value = req.getParameter(key);
if (StringUtils.isEmpty(value)) {
throw new IllegalArgumentException("Param '" + key + "' is required.");
}
String encoding = req.getParameter("encoding");
return resolveValue(value, encoding);
}
/** * optional方法在required方法的基础上增加了默认值,如果获取不到,则返回默认值。 */
public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
return defaultValue;
}
String value = req.getParameter(key);
if (StringUtils.isBlank(value)) {
return defaultValue;
}
String encoding = req.getParameter("encoding");
return resolveValue(value, encoding);
}
nacos 的 server 与 client
使用了http
协议来交互,那么在server
端必定提供了http
接口的入口,并且在core
模块看到其依赖了spring boot starter
,所以它的http接口由集成了Spring的web服务器支持,简单地说就是像我们平时写的业务服务一样,有controller层和service层。
以OpenAPI作为入口来学习,我们找到/nacos/v1/ns/instance
服务注册接口,在nacos-naming
工程中我们可以看到InstanceController
正是我们要找的对象,如下图:
处理服务注册,我们直接找对应的POST方法即可,代码如下:
/** * Register new instance. * 接收客户端注册信息 * @param request http request * @return 'ok' if success * @throws Exception any error during register */
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//获取namespaceid,该参数是可选参数
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//获取服务名字
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校验服务的名字,服务的名字格式为[email protected]@serviceName
NamingUtils.checkServiceNameFormat(serviceName);
//创建实例
final Instance instance = parseInstance(request);
//注册服务
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
如上图,该方法主要用于接收客户端注册信息,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到Nacos中,注册的方法如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否
//存在该服务,如果不存在就创建空的服务
//如果实例为空,则创建实例,并且会将创建的实例存入到serviceMap集合中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从serviceMap集合中获取创建的实例
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//服务注册,这一步才会把服务的实例信息和服务绑定起来
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起。
Distro协议介绍
Distro是阿里巴巴的私有协议, 是一种分布式一致性算法,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议:该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失 。
Distro 协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。
Distro 协议具有以下特点:
1:专门为了注册中心而创造出的协议;
2:客户端与服务端有两个重要的交互,服务注册与心跳发送;
3:客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;
4:客户端请求失败则换一个节点重新发送请求;
5:服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;
6:每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;
7:服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;
8:服务端如果长时间未收到客户端心跳,则下线该服务;
9:负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他节点;
10:节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。
Distro寻址
Distro协议主要用于nacos 服务端节点之间的相互发现,nacos使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:
单机模式(StandaloneMemberLookup)
文件模式(FileConfigMemberLookup)
服务器模式(AddressServerMemberLookup)
三种寻址模式如下图:
1.2.3.1 单机模式
在com.alibaba.nacos.core.cluster.lookup.LookupFactory
中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式,如果是集群启动,
/** * Create the target addressing pattern. * 创建寻址模式 * @param memberManager {@link ServerMemberManager} * @return {@link MemberLookup} * @throws NacosException NacosException */
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
//NacosServer 集群方式启动
if (!EnvUtil.getStandaloneMode()) {
String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
//由参数中传入的寻址方式得到LookupType对象
LookupType type = chooseLookup(lookupType);
//选择寻址方式
LOOK_UP = find(type);
//设置当前寻址方式
currentLookupType = type;
} else {
//NacosServer单机启动
LOOK_UP = new StandaloneMemberLookup();
}
LOOK_UP.injectMemberManager(memberManager);
Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
return LOOK_UP;
}
/*** * 选择寻址方式 * @param type * @return */
private static MemberLookup find(LookupType type) {
//文件寻址模式,也就是配置cluster.conf配置文件将多个节点串联起来,
// 通过配置文件寻找其他节点,以达到和其他节点通信的目的
if (LookupType.FILE_CONFIG.equals(type)) {
LOOK_UP = new FileConfigMemberLookup();
return LOOK_UP;
}
//服务器模式
if (LookupType.ADDRESS_SERVER.equals(type)) {
LOOK_UP = new AddressServerMemberLookup();
return LOOK_UP;
}
// unpossible to run here
throw new IllegalArgumentException();
}
单节点寻址模式会直接创建StandaloneMemberLookup
对象,而文件寻址模式会创建FileConfigMemberLookup
对象,服务器寻址模式会创建AddressServerMemberLookup
;
1.2.3.2 文件寻址模式
文件寻址模式主要在创建集群的时候,通过cluster.conf
来配置集群,程序可以通过监听cluster.conf
文件变化实现动态管理节点,FileConfigMemberLookup
源码如下:
public class FileConfigMemberLookup extends AbstractMemberLookup {
//创建文件监听器
private FileWatcher watcher = new FileWatcher() {
//文件发生变更事件
@Override
public void onChange(FileChangeEvent event) {
readClusterConfFromDisk();
}
//检查context是否包含cluster.conf
@Override
public boolean interest(String context) {
return StringUtils.contains(context, "cluster.conf");
}
};
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
readClusterConfFromDisk();
// 使用inotify机制来监视文件更改,并自动触发对cluster.conf的读取
try {
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
} catch (Throwable e) {
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
}
}
}
@Override
public void destroy() throws NacosException {
WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
}
private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = EnvUtil.readClusterConf();
tmpMembers = MemberUtil.readServerConf(tmp);
} catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
}
afterLookup(tmpMembers);
}
}
1.2.3.3 服务器寻址模式
使用地址服务器存储节点信息,会创建AddressServerMemberLookup
,服务端定时拉取信息进行管理;
public class AddressServerMemberLookup extends AbstractMemberLookup {
private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
};
public String domainName;
public String addressPort;
public String addressUrl;
public String envIdUrl;
public String addressServerUrl;
private volatile boolean isAddressServerHealth = true;
private int addressServerFailCount = 0;
private int maxFailCount = 12;
private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);
private volatile boolean shutdown = false;
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
initAddressSys();
run();
}
}
/*** * 获取服务器地址 */
private void initAddressSys() {
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");
} else {
domainName = envDomainName;
}
String envAddressPort = System.getenv("address_server_port");
if (StringUtils.isBlank(envAddressPort)) {
addressPort = EnvUtil.getProperty("address.server.port", "8080");
} else {
addressPort = envAddressPort;
}
String envAddressUrl = System.getenv("address_server_url");
if (StringUtils.isBlank(envAddressUrl)) {
addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");
} else {
addressUrl = envAddressUrl;
}
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
Loggers.CORE.info("ServerListService address-server port:" + addressPort);
Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
}
@SuppressWarnings("PMD.UndefineMagicConstantRule")
private void run() throws NacosException {
// With the address server, you need to perform a synchronous member node pull at startup
// Repeat three times, successfully jump out
boolean success = false;
Throwable ex = null;
int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
for (int i = 0; i < maxRetry; i++) {
try {
//拉取集群节点信息
syncFromAddressUrl();
success = true;
break;
} catch (Throwable e) {
ex = e;
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
}
}
if (!success) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
//创建定时任务
GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
}
@Override
public void destroy() throws NacosException {
shutdown = true;
}
@Override
public Map<String, Object> info() {
Map<String, Object> info = new HashMap<>(4);
info.put("addressServerHealth", isAddressServerHealth);
info.put("addressServerUrl", addressServerUrl);
info.put("envIdUrl", envIdUrl);
info.put("addressServerFailCount", addressServerFailCount);
return info;
}
private void syncFromAddressUrl() throws Exception {
RestResult<String> result = restTemplate
.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
if (result.ok()) {
isAddressServerHealth = true;
Reader reader = new StringReader(result.getData());
try {
afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
} catch (Throwable e) {
Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
ExceptionUtil.getAllExceptionMsg(e));
}
addressServerFailCount = 0;
} else {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
}
}
// 定时任务
class AddressServerSyncTask implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
}
try {
//拉取服务列表
syncFromAddressUrl();
} catch (Throwable ex) {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
} finally {
GlobalExecutor.scheduleByCommon(this, 5_000L);
}
}
}
}
数据同步
Nacos数据同步分为全量同步和增量同步,所谓全量同步就是初始化数据一次性同步,而增量同步是指有数据增加的时候,只同步增加的数据。
全量同步
全量同步流程比较复杂,流程如上图:
1:启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据
2:调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据
3:从namingProxy代理获取所有的数据data
4:构造http请求,调用httpGet方法从指定的server获取数据
5:从获取的结果result中获取数据bytes
6:处理数据processData
7:从data反序列化出datumMap
8:把数据存储到dataStore,也就是本地缓存dataMap
9:监听器不包括key,就创建一个空的service,并且绑定监听器
10:监听器listener执行成功后,就更新data store
任务启动
在com.alibaba.nacos.core.distributed.distro.DistroProtocol
的构造函数中调用startDistroTask()
方法,该方法会执行startVerifyTask()
和startLoadTask()
,我们重点关注startLoadTask()
,该方法代码如下:
/*** * 启动DistroTask */
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
//启动startVerifyTask,做数据同步校验
startVerifyTask();
//启动DistroLoadDataTask,批量加载数据
startLoadTask();
}
//启动DistroLoadDataTask
private void startLoadTask() {
//处理状态回调对象
DistroCallback loadCallback = new DistroCallback() {
//处理成功
@Override
public void onSuccess() {
isInitialized = true;
}
//处理失败
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
//执行DistroLoadDataTask,是一个多线程
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
/*** * 启动startVerifyTask * 数据校验 */
private void startVerifyTask() {
GlobalExecutor.schedulePartitionDataTimedSync(
new DistroVerifyTask(
memberManager,
distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
}
数据如何执行加载
上面方法会调用DistroLoadDataTask
对象,而该对象其实是个线程,因此会执行它的run方法,run方法会调用load()方法实现数据全量加载,代码如下:
/*** * 数据加载过程 */
@Override
public void run() {
try {
//加载数据
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
/*** * 加载数据,并同步 * @throws Exception */
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
//同步数据
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
//从远程机器上同步所有数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
数据同步
数据同步会通过Http请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:
1:loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步到本机
2:transportAgent.getDatumSnapshot()远程加载数据,通过Http请求执行远程加载
3:dataProcessor.processSnapshot()处理数据同步到本地
数据处理完整逻辑代码如下:loadAllDataSnapshotFromRemote()
方法
/*** * 从远程机器上同步所有数据 */
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
//遍历集群成员节点,不包括自己
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
//从远程节点加载数据,调用http请求接口: distro/datums;
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
//处理数据
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
远程加载数据代码如下:transportAgent.getDatumSnapshot()
方法
/*** * 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes; * @param targetServer target server. * @return */
@Override
public DistroData getDatumSnapshot(String targetServer) {
try {
//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;
byte[] allDatum = NamingProxy.getAllData(targetServer);
//将数据封装成DistroData
return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
} catch (Exception e) {
throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
}
}
/** * Get all datum from target server. * NamingProxy.getAllData * 执行HttpGet请求,并获取返回数据 * @param server target server address * @return all datum byte array * @throws Exception exception */
public static byte[] getAllData(String server) throws Exception {
//参数封装
Map<String, String> params = new HashMap<>(8);
//组装URL,并执行HttpGet请求,获取结果集
RestResult<String> result = HttpClient.httpGet(
"http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
new ArrayList<>(), params);
//返回数据
if (result.ok()) {
return result.getData().getBytes();
}
throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "
+ result.getMessage());
}
处理数据同步到本地代码如下:dataProcessor.processSnapshot()
/** * 数据处理并更新本地缓存 * @param data * @return * @throws Exception */
private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
//从data反序列化出datumMap
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
// 把数据存储到dataStore,也就是本地缓存dataMap
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
//监听器不包括key,就创建一个空的service,并且绑定监听器
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
//创建一个空的service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManager
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
//为空的绑定监听器
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
//循环所有datumMap
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
//执行监听器的onChange监听方法
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
// 监听器listener执行成功后,就更新dataStore
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
到此实现数据全量同步,其实全量同步最终封装的协议还是Http。
增量同步
新增数据使用异步广播同步:
1:DistroProtocol 使用 sync() 方法接收增量数据
2:向其他节点发布广播任务
调用 distroTaskEngineHolder 发布延迟任务
3:调用 DistroDelayTaskProcessor.process() 方法进行任务投递:将延迟任务转换为异步变更任务
4:执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送消息
调用 DistroHttpAgent.syncData() 方法发送数据
调用 NamingProxy.syncData() 方法发送数据
5:异常任务调用 handleFailedTask() 方法进行处理
调用 DistroFailedTaskHandler 处理失败任务
调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新投递成延迟任务。
增量数据入口
我们回到服务注册,服务注册的InstanceController.register()
就是数据入口,它会调用ServiceManager.registerInstance()
,执行数据同步的时候,调用addInstance()
,在该方法中会执行DistroConsistencyServiceImpl.put()
,该方法是增量同步的入口,会调用distroProtocol.sync()
方法,代码如下:
/*** * 数据保存 * @param key key of data, this key should be globally unique * @param value value of data * @throws NacosException */
@Override
public void put(String key, Record value) throws NacosException {
//将数据存入到dataStore中
onPut(key, value);
//使用distroProtocol同步数据
distroProtocol.sync(
new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
sync()
方法会执行任务发布,代码如下:
public void sync(DistroKey distroKey, DataOperation action, long delay) {
//向除了自己外的所有节点广播
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来
//执行延时任务发布
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
增量同步操作
延迟任务对象我们可以从DistroTaskEngineHolder
构造函数中得知是DistroDelayTaskProcessor
,代码如下:
/*** * 构造函数指定任务处理器 * @param distroComponentHolder */
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
//指定任务处理器defaultDelayTaskProcessor
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
它延迟执行的时候会执行process
方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:
/*** * 任务处理过程 * @param task task. * @return */
@Override
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
//将延迟任务变更成异步任务,异步任务对象是一个线程
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
//将任务添加到NacosExecuteTaskExecuteEngine中,并执行
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
DistroSyncChangeTask
实质上是任务的开始,它自身是一个线程,所以会执行它的run方法,而run方法这是数据同步操作,代码如下:
/*** * 执行数据同步 */
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
//获取本地缓存数据
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
//向其他节点同步数据
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
数据同步会执行调用syncData
,该方法其实就是通过Http协议将数据发送到其他节点实现数据同步,代码如下:
/*** * 向其他节点同步数据 * @param data data * @param targetServer target server * @return */
@Override
public boolean syncData(DistroData data, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true;
}
//获取数据字节数组
byte[] dataContent = data.getContent();
//通过Http协议同步数据
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
最后:一定要跟着讲师所给的源码自行走一遍!!!
本文由传智教育博学谷 - 狂野架构师教研团队发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!
边栏推荐
猜你喜欢
Read FastDFS in one article
浅谈我国产业园区未来的发展方向
vscode插件设置——Golang开发环境配置
国内首发可视化智能调优平台,小龙带你玩转KeenTune UI
建木DevOps流程的快速运用
七夕活动浪漫上线,别让网络拖慢和小姐姐的开黑时间
FastDFS 一文读懂
全球首款量产,获定点最多!这家AVP Tier1如何实现领跑?
[Miscellaneous] How to install the specified font into the computer and then use the font in the Office software?
Why Flutter Flutter of tutorials is the best choice for business?
随机推荐
LeetCode 0155. 最小栈
我的祖国
第1章:初识数据库与MySQL----MySQL安装
ping数据包中的进程号
DataBinding下的RecycleView适配器Adapter基类
【LeetCode】最长公共子序列(动态规划)
Node.js的基本使用(三)数据库与身份认证
Justin Sun was invited to attend the 36氪 Yuan Universe Summit and delivered a keynote speech
七夕活动浪漫上线,别让网络拖慢和小姐姐的开黑时间
POE交换机全方位解读(下)
绕任意轴旋转矩阵推导
Justin Sun: Web3.0 and the Metaverse will assist mankind to enter the online world more comprehensively
通过whl安装第三方包
Jmeter-参数化
C语言实验十五 文件
JS获得URL超链接的参数值
The longest substring that cannot have repeating characters in a leetcode/substring
关于mnn模型输出的数据杂乱无章问题
学习笔记 | uiautomation(如何)实现自动化
北京电竞元宇宙论坛活动顺利召开