当前位置:网站首页>Tear the Nacos source code by hand (tear the client source code first)

Tear the Nacos source code by hand (tear the client source code first)

2022-07-07 18:05:00 Erudite Valley wild architect

Nacos Source analysis

Nacos There are many things worth learning about the source code , In order to understand Nacos, We analyze the source code , The analysis is as follows 2 A knowledge :

1:Nacos The principle of access to the registry 

2:Nacos Registration service processing flow 

Source environment building

1、 Cloned from official projects , And detect out 1.4.1 edition , Import idea.nacos The source code environment is easy to build , There is hardly any error , After importing, compile and install it into the local environment

file

2、 find config Found in module \resources\META-INF\nacos-db.sql, In the local mysql Create database nacos-config, Import the script into the execution table .

3、 find console The configuration file under the module application.properties, Modify the relevant configuration

#*************** Config Module Related Configurations ***************#
### If use MySQL as datasource:
spring.datasource.platform=mysql

### Count of DB:
db.num=1

### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=nacos
db.password.0=nacos

4、 find console Start class under module , start-up nacos The service side , Add startup parameters when starting , Specify the startup mode as non cluster startup

-Dnacos.standalone=true

file

5、 Visit the local nacos:http://localhost:8848/nacos

thus , The source code environment is built successfully !

nacos client

The first thing to know is this :nacos Our client is actually in our own service , We introduced nacos The relevant coordinates of depend on ,nacos The client to jar The form of package works in our service

file

about nacos The client of , What functions does it have to help us achieve ?

Its main work is : Service registration 、 Service discovery 、 Service offline operation 、 Service subscription operation and other related operations .

The interaction between the client and the registry server , It mainly focuses on service registration 、 Service offline 、 Service discovery 、 Subscribe to a service , In fact, the most used are service registration and service discovery , Next, I will analyze these four functions from the perspective of source code .

stay Nacos Source code nacos-example in com.alibaba.nacos.example.NamingExample Classes demonstrate this 4 Operation of functions , We can use it as an entrance , The code is as follows :

public class NamingExample {

    public static void main(String[] args) throws NacosException {

        Properties properties = new Properties();
        properties.setProperty("serverAddr", System.getProperty("serverAddr"));
        properties.setProperty("namespace", System.getProperty("namespace"));

        NamingService naming = NamingFactory.createNamingService(properties);
        // Service registration 
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        // Service discovery 
        System.out.println(naming.getAllInstances("nacos.test.3"));
        // Service offline 
        naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        System.out.println(naming.getAllInstances("nacos.test.3"));
        // Service subscription 
        Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("test-thread");
                        return thread;
                    }
                });

        naming.subscribe("nacos.test.3", new AbstractEventListener() {

            //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
            //So you can override getExecutor() to async handle event.
            @Override
            public Executor getExecutor() {
                return executor;
            }

            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent) event).getServiceName());
                System.out.println(((NamingEvent) event).getInstances());
            }
        });
    }
}

1.1.1 Service registration

We found the following service registration method call in the case nacos-api Medium NamingService.registerInstance() And find its implementation classes and methods com.alibaba.nacos.client.naming.NacosNamingService, The code is as follows :

/***
 *  Service registration 
 * @param serviceName  Service name 
 * @param ip           service IP
 * @param port         Service port 
 * @param clusterName  The cluster name 
 * @throws NacosException
 */
@Override
public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
}

@Override
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
        throws NacosException {
    // Set instance IP:Port, The default weight is 1.0
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setWeight(1.0);
    instance.setClusterName(clusterName);
    // Registration instance 
    registerInstance(serviceName, groupName, instance);
}

@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}

/***
 *  Instance registration 
 * @param serviceName name of service
 * @param groupName   group of service
 * @param instance    instance to register
 * @throws NacosException
 */
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // This field indicates whether the registered instance is temporary or persistent .
    //  If it's a temporary example , Not in  Nacos  Server persistent storage , It needs to report the heartbeat to carry out the package work ,
    //  If you don't report your heartbeat for a while , Will be  Nacos  Server remove .
    if (instance.isEphemeral()) {
        // Set a scheduled task for the registration service to obtain heartbeat information , The default is 5s Report once 
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // Register to the server 
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

Registration mainly does two things , The first thing : Set a scheduled heartbeat task for the registered service . The second thing : Register the service to the server .

1: Start a timed heartbeat task , The time interval is 5s, If the service is normal , Don't deal with it , If it's not normal , Re registration 

2: send out http Request to the registry server , Call the service registration interface , Registration service 

In the above code, we can see that the scheduled task is added , But I don't see the remote request completely ,serverProxy.registerService() The method is as follows , The request parameters will be encapsulated first , Next call reqApi() and reqApi() In the end, it will call callServer(), The code is as follows :

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    // encapsulation Http Request parameters 
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    // perform Http request 
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}


/***
* Make a remote call 
**/
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
        String method) throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    injectSecurityInfo(params);
    // Encapsulate the request header 
    Header header = builderHeader();
    
    // The request is Http still Https agreement 
    String url;
    if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
        url = curServer + api;
    } else {
        if (!IPUtil.containsPort(curServer)) {
            curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
    }

    try {
        // Execute remote request , And get the result set 
        HttpRestResult<String> restResult = nacosRestTemplate
                .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();
        
        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                .observe(end - start);
        // Result set parsing 
        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}

Perform remote Http The requested object is NacosRestTemplate, This object encapsulates ordinary Http request , You can check it yourself .

1.1.2 Service discovery

Two entrances :

1、NamingService.getAllInstances(serviceName)

We will find... Along the service discovery method call in the case nacos-api Medium NamingService.getAllInstances() And find its implementation classes and methods com.alibaba.nacos.client.naming.NacosNamingService.getAllInstances(), The code is as follows :

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    /* Default true-> Get service instance */
    if (subscribe) {
        // Get from local cache , If the local cache does not exist, pull it from the server 
        // The local cache will be stored in HostReactor.serviceInfoMap in , It's a Map object 
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

The code above calls hostReactor.getServiceInfo() Method , This method will first call getServiceInfo0() Method to get data from the local cache , The cache has no data , Update the build instance to Nacos, And from Nacos Get the latest data from ,getServiceInfo0() The source code of the method is as follows :

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    /*1. First get the service object from the local cache , Because it's the first time to come in , So the cache does not exist */
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    if (null == serviceObj) {
        /* Building service instances */
        serviceObj = new ServiceInfo(serviceName, clusters);
        /* Store the service instance in the cache */
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        /* to update nacos- Service on */
        updatingMap.put(serviceName, new Object());
        /* Actively seek , And update to cache local , And expired service updates */
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    /*2. Turn on timed tasks */
    scheduleUpdateIfAbsent(serviceName, clusters);
    return serviceInfoMap.get(serviceObj.getKey());
}

updateServiceNow(serviceName, clusters); The master and slave get the update data from the remote server , Will eventually call updateService() Method , In this method, remote request and data processing are completed , Source code is as follows :

public void updateService(String serviceName, String clusters) throws NacosException {
    /* Get the services that exist in the local cache list */
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        /* Get service and provider port information , Port, etc */
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

        if (StringUtils.isNotEmpty(result)) {
            /* Process the results */
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

1.1.3 Service offline

We found the service offline method call in the case nacos-api Medium NamingService.deregisterInstance() And find its implementation classes and methods NacosNamingService.deregisterInstance(), The code is as follows :

@Override
public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName)
        throws NacosException {
    // Build instance information 
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setClusterName(clusterName);
    // Service offline operation 
    deregisterInstance(serviceName, groupName, instance);
}

@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        // Remove the scheduled task of heartbeat information monitoring 
        beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
                instance.getPort());
    }
    // Send a remote request to destroy the service offline 
    serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}

The service offline method is relatively simple , And service registration does exactly the opposite , Also did two things , The first thing : No heartbeat detection . The second thing : Request the server service offline interface .

1.1.4 Service subscription

We can view the cases of subscription services , A thread pool will be created first , Next, we will encapsulate the thread pool into the listener , The listener can listen to the specified instance information , The code is as follows :

// Service subscription 
Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("test-thread");
                return thread;
            }
        });

naming.subscribe("nacos.test.3", new AbstractEventListener() {

    //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
    //So you can override getExecutor() to async handle event.
    @Override
    public Executor getExecutor() {
        return executor;
    }
    // Read the monitored service instance 
    @Override
    public void onEvent(Event event) {
        System.out.println(((NamingEvent) event).getServiceName());
        System.out.println(((NamingEvent) event).getInstances());
    }
});

We found the following service subscription method call in the case nacos-api Medium NamingService.subscribe() And find its implementation classes and methods NacosNamingService.deregisterInstance(), The code is as follows :

public void subscribe(String serviceName, String clusters, EventListener eventListener) {
    // Register to listen 
    notifier.registerListener(serviceName, clusters, eventListener);
    // Get and update service instances 
    getServiceInfo(serviceName, clusters);
}

The listener will be registered at this time , Register to listen , Registering to listen is to inject the current listening object information into listenerMap Collection , In the specified method of the listening object onEvent Instance information can be read in , The code is as follows :

public void registerListener(String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(serviceName, clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                listenerMap.put(key, eventListeners);
            }
        }
    }
    // Put the current listening object into the collection , Listening to the object onEvent The corresponding instance object can be read in 
    eventListeners.add(listener);
}

This article is written by the wisdom education valley - The teaching and research team of wild architects released
If this article helps you , Welcome to pay attention and like ; If you have any suggestions, you can also leave comments or private letters , Your support is the driving force for me to adhere to my creation
Reprint please indicate the source !

原网站

版权声明
本文为[Erudite Valley wild architect]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207071558068748.html