当前位置:网站首页>11. Provider service registration of Nacos service registration source code analysis
11. Provider service registration of Nacos service registration source code analysis
2022-07-03 11:02:00 【May your smile become a breeze】
11.nacos Provider service registration of service registration source code analysis
Source environment description :
springcloud 2020.0.4
springcloud alibaba 2021.1
Refer to the nacos edition :1.4.1
Source code analysis project code address
primary coverage
At present, blogs are mainly nacos-client End code analysis , It mainly introduces how the client communicates with the server , Include :
nacos-clientHow to register the current micro service tonacos-servernacos-clientHow to maintain andnacos-serverThe heart ofnacos-clientEvery time 10s Rotation pull service instance listnacos-clientHow to receivenacos-serverService change push (udp The way )
Service provider service registration source code analysis
nacos Registration center integration springcloud project pom as follows .nacos And springcloud Integration of , Mainly by the following starter To realize
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
At present starter The class structure of the module is as follows :
Description of main assembly classes
Configuration class NacosDiscoveryAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {
// assembly nacos Registry properties
@Bean
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}
// NacosServiceDiscovery It encapsulates the method of interacting with the registry to obtain service instances
// Such as :List<ServiceInstance> getInstances(String serviceId)
@Bean
@ConditionalOnMissingBean
public NacosServiceDiscovery nacosServiceDiscovery(
NacosDiscoveryProperties discoveryProperties,
NacosServiceManager nacosServiceManager) {
return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
}
}
Configuration class NacosDiscoveryEndpointAutoConfiguration Used to expose service health 、 Health indicators and other data .
Configuration class NacosServiceRegistryAutoConfiguration, Assembly service registration related functions ( a key )
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({
AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
// Service registration class : Current microservice information , Register by this class to nacos In the service registry
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
// Service registration information
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
// Service automatic registration class , Called by this class NacosServiceRegistry Registration in
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
Service registration process analysis
from WebServerInitializedEvent Event triggered service registration
// AbstractAutoServiceRegistration: springcloud The services provided automatically register top-level encapsulation classes
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
// Receive the event callback of service initialization completion
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
// ## Publish service pre registration events
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
// ## Enforcement registration ( Here from NacosServiceRegistry Realization )
register();
if (shouldRegisterManagement()) {
registerManagement();
}
// Publish service registration completion event
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
nacos The client registers the service NacosServiceRegistry
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// ## Get the registration service
NamingService namingService = namingService();
// ## The service name ${spring.application.name}
String serviceId = registration.getServiceId();
// ## Service group , Default DEFAULT_GROUP
String group = nacosDiscoveryProperties.getGroup();
// ## Build service information class
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// ### The registration service performs registration ( See below NacosNamingService )
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
}
Registry service class :NamingService
public class NacosNamingService implements NamingService {
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// ## Temporary node , Add heartbeat task ( See below BeatReactor)
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// Enforcement registration
serverProxy.registerService(groupedServiceName, groupName, instance);
}
}
Classes that actually interact with the registry :NamingProxy
public class NamingProxy implements Closeable {
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// Construct the parameter information of the current service node
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId); // namespace
params.put(CommonParams.SERVICE_NAME, serviceName); // serviceName
params.put(CommonParams.GROUP_NAME, groupName); // Group information
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); // Cluster information , Default DEFAULT
params.put("ip", instance.getIp()); // Current node ip
params.put("port", String.valueOf(instance.getPort())); // Current node port
params.put("weight", String.valueOf(instance.getWeight())); // Current node weight
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy())); // Current node health status
params.put("ephemeral", String.valueOf(instance.isEphemeral())); // Whether to temporary node
params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // Metadata
// Perform the requested
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
// Initiate request
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
// Retry mechanism
try {
#### http call nacos The server registers Services
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
// ## Use restTemplate Mode request nacos Server side Register for services
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
}
callServer Parameter information of 
Come here , The service registration is completed , Can be in nacos Under the service management of the registry , See the registered service information .
nacos Client heartbeat mechanism BeatReactor
NacosNamingService At first BeatReactor
public class NacosNamingService implements NamingService {
private HostReactor hostReactor; // Main engine reactor : Pull the service list regularly , maintain upd Push service
private BeatReactor beatReactor; // Heartbeat reactor : Report the heartbeat to the server
private NamingProxy serverProxy; // Registry service agent , Communicate with the registration server
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
initServerAddr(properties);
InitUtils.initWebRootContext(properties);
initCacheDir();
initLogName(properties);
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir,
isLoadCacheAtStart(properties),
isPushEmptyProtect(properties),
initPollingThreadCount(properties));
}
}
BeatReactor The code analysis
public class BeatReactor implements Closeable {
// Building functions
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
// Created a scheduling thread pool
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
// from NacosNamingService Call... When the service is registered
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// Start a heartbeat task
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
// Heartbeat Services
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
// Send a heartbeat
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// When sending heartbeat , The service is not registered , Then register the service
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
// After heartbeat processing , Add another heartbeat task
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
Data sent by heartbeat 
Pull service information regularly HostReactor
public class HostReactor implements Closeable {
// Local service cache
private final Map<String, ServiceInfo> serviceInfoMap;
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart,
boolean pushEmptyProtection, int pollingThreadCount) {
// init executorService
// # Task scheduling thread pool
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
// # Heartbeat response heap
this.beatReactor = beatReactor;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.pushEmptyProtection = pushEmptyProtection;
this.updatingMap = new ConcurrentHashMap<String, Object>();
// Handling error messages : Local file cache failure processing
this.failoverReactor = new FailoverReactor(this, cacheDir);
// ## ** initialization UDP Receiving services
this.pushReceiver = new PushReceiver(this);
this.notifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(notifier);
}
// Update the service list by service name
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// Go to the remote server to query the service list
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
// Process the obtained service list , And save to serviceInfoMap In cache
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
// Regular update service
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String clusters;
private final String serviceName;
/** * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty */
private int failCount = 0;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
private void incFailCount() {
int limit = 6;
if (failCount == limit) {
return;
}
failCount++;
}
private void resetFailCount() {
failCount = 0;
}
@Override
public void run() {
// Default 10s
long delayTime = DEFAULT_DELAY;
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
// The local cache doesn't have , Pull from the server , Go straight back to
updateService(serviceName, clusters);
return;
}
// The server push was not received , Refresh the service list information
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
// If you receive a push from the server during , There is no need to update
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis(); // 10s
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
// Add another scheduled task , Pull the service list information .
// Decay retry mechanism
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
}
}
When refreshing the service list , Will send to the server upd Push related information :udp port 
Receive service push PushReceiver Udp Server side
public class PushReceiver implements Runnable, Closeable {
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
// An available port will be randomly bound , As udp Push port of
this.udpSocket = new DatagramSocket();
// A thread pool for a thread
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
// Start the current thread , perform run Method
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// udp Receive the data sent by the service
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
// If dom change , Or service change
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// Update service data
hostReactor.processServiceJson(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
// Send a response
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
}
边栏推荐
- 8年测试总监的行业思考,看完后测试思维认知更深刻
- QT:QSS自定义QLineEdit实例
- QT:QSS自定义QTableView实例
- Qt:qss custom qstatusbar instance
- 那些一門心思研究自動化測試的人,後來怎樣了?
- In the middle of the year, I have prepared a small number of automated interview questions. Welcome to the self-test
- QT:QSS自定义 QSplitter实例
- QT:QSS自定义QToolButton实例
- Is it OK to test the zero basis software?
- MAUI Developer Day in GCR
猜你喜欢

Hard goods | write all the codes as soon as you change the test steps? Why not try yaml to realize data-driven?

I, a tester from a large factory, went to a state-owned enterprise with a 50% pay cut. I regret it

STM32F1与STM32CubeIDE编程实例-TM1637驱动4位7段数码管

Take you into the cloud native database industry, Amazon Aurora

QT:QSS自定义 QProgressBar实例

有些能力,是工作中学不来的,看看这篇超过90%同行

Large scale e-commerce project - environment construction

What happened to those who focused on automated testing?

Multiple IO transfer - preamble
The normal one inch is 25.4 cm, and the image field is 16 cm
随机推荐
What experience is there only one test in the company? Listen to what they say
Extern keyword
QT:QSS自定义 QTabWidget 和 QTabBar实例
QT:QSS自定义QLineEdit实例
Day 7 small exercise
月薪过万的测试员,是一种什么样的生活状态?
最高月薪18K 拥有好的“心态和选择”, 成功就差“认真和坚持”~
Matlab memory variable management command
Data captured
Basic usage of sqlmap
独家分析 | 关于简历和面试的真 相
Take you into the cloud native database industry, Amazon Aurora
In the middle of the year, I have prepared a small number of automated interview questions. Welcome to the self-test
What is the salary level of 17k? Let's take a look at the whole interview process of post-95 Test Engineers
Programming examples of stm32f1 and stm32subeide -tm1637 drives 4-bit 7-segment nixie tubes
Cache routing component
《通信软件开发与应用》
MAUI Developer Day in GCR
Solve the problem that pycharm Chinese input method does not follow
那些一門心思研究自動化測試的人,後來怎樣了?