Nacos Source analysis
Nacos There are many things worth learning about the source code , In order to understand Nacos, We analyze the source code , The analysis is as follows 2 A knowledge :
1:Nacos The principle of access to the registry
2:Nacos Registration service processing flow
Source environment building
1、 Cloned from official projects , And detect out 1.4.1 edition , Import idea.nacos The source code environment is easy to build , There is hardly any error , After importing, compile and install it into the local environment
2、 find config
Found in module \resources\META-INF\nacos-db.sql
, In the local mysql Create database nacos-config
, Import the script into the execution table .
3、 find console
The configuration file under the module application.properties
, Modify the relevant configuration
#*************** Config Module Related Configurations ***************#
### If use MySQL as datasource:
spring.datasource.platform=mysql
### Count of DB:
db.num=1
### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=nacos
db.password.0=nacos
4、 find console
Start class under module , start-up nacos The service side , Add startup parameters when starting , Specify the startup mode as non cluster startup
-Dnacos.standalone=true
5、 Visit the local nacos:http://localhost:8848/nacos
thus , The source code environment is built successfully !
nacos client
The first thing to know is this :nacos Our client is actually in our own service , We introduced nacos The relevant coordinates of depend on ,nacos The client to jar The form of package works in our service
about nacos The client of , What functions does it have to help us achieve ?
Its main work is : Service registration 、 Service discovery 、 Service offline operation 、 Service subscription operation and other related operations .
The interaction between the client and the registry server , It mainly focuses on service registration 、 Service offline 、 Service discovery 、 Subscribe to a service , In fact, the most used are service registration and service discovery , Next, I will analyze these four functions from the perspective of source code .
stay Nacos Source code nacos-example
in com.alibaba.nacos.example.NamingExample
Classes demonstrate this 4 Operation of functions , We can use it as an entrance , The code is as follows :
public class NamingExample {
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", System.getProperty("serverAddr"));
properties.setProperty("namespace", System.getProperty("namespace"));
NamingService naming = NamingFactory.createNamingService(properties);
// Service registration
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
// Service discovery
System.out.println(naming.getAllInstances("nacos.test.3"));
// Service offline
naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
System.out.println(naming.getAllInstances("nacos.test.3"));
// Service subscription
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-thread");
return thread;
}
});
naming.subscribe("nacos.test.3", new AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
}
});
}
}
1.1.1 Service registration
We found the following service registration method call in the case nacos-api
Medium NamingService.registerInstance()
And find its implementation classes and methods com.alibaba.nacos.client.naming.NacosNamingService
, The code is as follows :
/***
* Service registration
* @param serviceName Service name
* @param ip service IP
* @param port Service port
* @param clusterName The cluster name
* @throws NacosException
*/
@Override
public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
}
@Override
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
// Set instance IP:Port, The default weight is 1.0
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
// Registration instance
registerInstance(serviceName, groupName, instance);
}
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}
/***
* Instance registration
* @param serviceName name of service
* @param groupName group of service
* @param instance instance to register
* @throws NacosException
*/
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// This field indicates whether the registered instance is temporary or persistent .
// If it's a temporary example , Not in Nacos Server persistent storage , It needs to report the heartbeat to carry out the package work ,
// If you don't report your heartbeat for a while , Will be Nacos Server remove .
if (instance.isEphemeral()) {
// Set a scheduled task for the registration service to obtain heartbeat information , The default is 5s Report once
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// Register to the server
serverProxy.registerService(groupedServiceName, groupName, instance);
}
Registration mainly does two things , The first thing : Set a scheduled heartbeat task for the registered service . The second thing : Register the service to the server .
1: Start a timed heartbeat task , The time interval is 5s, If the service is normal , Don't deal with it , If it's not normal , Re registration
2: send out http Request to the registry server , Call the service registration interface , Registration service
In the above code, we can see that the scheduled task is added , But I don't see the remote request completely ,serverProxy.registerService()
The method is as follows , The request parameters will be encapsulated first , Next call reqApi()
and reqApi()
In the end, it will call callServer()
, The code is as follows :
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// encapsulation Http Request parameters
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// perform Http request
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
/***
* Make a remote call
**/
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
// Encapsulate the request header
Header header = builderHeader();
// The request is Http still Https agreement
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
// Execute remote request , And get the result set
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
// Result set parsing
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
Perform remote Http The requested object is NacosRestTemplate
, This object encapsulates ordinary Http request , You can check it yourself .
1.1.2 Service discovery
Two entrances :
1、NamingService.getAllInstances(serviceName)
We will find... Along the service discovery method call in the case nacos-api
Medium NamingService.getAllInstances()
And find its implementation classes and methods com.alibaba.nacos.client.naming.NacosNamingService.getAllInstances()
, The code is as follows :
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
/* Default true-> Get service instance */
if (subscribe) {
// Get from local cache , If the local cache does not exist, pull it from the server
// The local cache will be stored in HostReactor.serviceInfoMap in , It's a Map object
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
The code above calls hostReactor.getServiceInfo()
Method , This method will first call getServiceInfo0()
Method to get data from the local cache , The cache has no data , Update the build instance to Nacos, And from Nacos Get the latest data from ,getServiceInfo0()
The source code of the method is as follows :
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
/*1. First get the service object from the local cache , Because it's the first time to come in , So the cache does not exist */
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
/* Building service instances */
serviceObj = new ServiceInfo(serviceName, clusters);
/* Store the service instance in the cache */
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
/* to update nacos- Service on */
updatingMap.put(serviceName, new Object());
/* Actively seek , And update to cache local , And expired service updates */
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
/*2. Turn on timed tasks */
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
updateServiceNow(serviceName, clusters);
The master and slave get the update data from the remote server , Will eventually call updateService()
Method , In this method, remote request and data processing are completed , Source code is as follows :
public void updateService(String serviceName, String clusters) throws NacosException {
/* Get the services that exist in the local cache list */
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
/* Get service and provider port information , Port, etc */
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
/* Process the results */
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
1.1.3 Service offline
We found the service offline method call in the case nacos-api
Medium NamingService.deregisterInstance()
And find its implementation classes and methods NacosNamingService.deregisterInstance()
, The code is as follows :
@Override
public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
// Build instance information
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
// Service offline operation
deregisterInstance(serviceName, groupName, instance);
}
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
// Remove the scheduled task of heartbeat information monitoring
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
instance.getPort());
}
// Send a remote request to destroy the service offline
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}
The service offline method is relatively simple , And service registration does exactly the opposite , Also did two things , The first thing : No heartbeat detection . The second thing : Request the server service offline interface .
1.1.4 Service subscription
We can view the cases of subscription services , A thread pool will be created first , Next, we will encapsulate the thread pool into the listener , The listener can listen to the specified instance information , The code is as follows :
// Service subscription
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-thread");
return thread;
}
});
naming.subscribe("nacos.test.3", new AbstractEventListener() {
//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}
// Read the monitored service instance
@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
}
});
We found the following service subscription method call in the case nacos-api
Medium NamingService.subscribe()
And find its implementation classes and methods NacosNamingService.deregisterInstance()
, The code is as follows :
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
// Register to listen
notifier.registerListener(serviceName, clusters, eventListener);
// Get and update service instances
getServiceInfo(serviceName, clusters);
}
The listener will be registered at this time , Register to listen , Registering to listen is to inject the current listening object information into listenerMap Collection , In the specified method of the listening object onEvent Instance information can be read in , The code is as follows :
public void registerListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
// Put the current listening object into the collection , Listening to the object onEvent The corresponding instance object can be read in
eventListeners.add(listener);
}
This article is written by the wisdom education valley - The teaching and research team of wild architects released
If this article helps you , Welcome to pay attention and like ; If you have any suggestions, you can also leave comments or private letters , Your support is the driving force for me to adhere to my creation
Reprint please indicate the source !