Preface

Nacos It's a Alibaba Components produced in the era of high-performance microservices , Integration of registration and configuration center . that Nacos Why such high performance ? Summarize the following ;

1: Based on the research of Ali distro Agreement to proceed Nacos Synchronize the data of different nodes

2: Using thread pool and asynchronism to improve API Response speed of

3:2.X Use grpc The way of long connection replaces 1.X You need to always send heartbeat packets to the export server CPU High occupancy problems

meanwhile 2.X Also on the 1.X Made a major upgrade , Major upgrades have been made both at the architecture level and at the code level , Conditionally upgrade to 2.X My classmates suggested that the client can be upgraded together with the server , Only in this way can we give full play to 2.X Advantages of Architecture .1.X and 2.X Comparison of as follows :

1.X2.X
How to connect Http Short connection GRpc、Http Short connection ( compatible 1.X)
Push way UDPGRpc
Health detection methods Http Short connection timing heartbeat packet Grpc A long connection ( Lightweight heartbeat package )

About Nacos1.X and 2.X For performance comparison, please refer to :Nacos 2.0 Performance comparison and pressure test before and after upgrading - Alicloud developer community (aliyun.com)

Here, I would like to borrow the Alibaba cloud community Nacos Architecture diagram :

Now we are based on Nacos2.0.4 Analyze why at the code level Nacos Source code , It's best to have the following foundation before reading , Design patterns ( Templates , entrust , agent , Single case , factory , Strategy )、 Asynchronous programming ,grpc.

start-up

First of all, let's take a look at Nacos Structure diagram :Nacos adopt Namespace( Namespace ) Isolate the environment , Then we can divide different services into different groups according to the relationship between services (Group) Between , There can be multiple services between each group (Service), At the same time, for disaster recovery , We can divide a service into different clusters (Cluster) Deployed in different regions or computer rooms , Each specific cluster is our instance (Instance) 了 , That is, the microservice project we developed .



because Nacos Many of them are handled asynchronously , So many times we can't read the code directly in the way of flow , When reading, I will jump back and forth , Asynchronous event programming is relatively complicated , First of all, let's take a look at Nacos Start up process of , Later, when I posted the code, I only posted the key code , Others are omitted , Follow up is not repeated .

Let's take a look at the logic of processing the requested event and listening

This kind com.alibaba.nacos.core.remote.RequestHandlerRegistry Monitored ContextRefreshedEvent event , that SpringBoot After startup, we will automatically execute the logic we need to process .

/**
* RequestHandlerRegistry.
* When Spring After initialization , load com.alibaba.nacos.core.remote.RequestHandler, Register as an event listener
*
* @author liuzunfei
* @version $Id: RequestHandlerRegistry.java, v 0.1 2020 year 07 month 13 Japan 8:24 PM liuzunfei Exp $
*/
@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> { /**
* Request processor set
*/
Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>(); @Autowired
private TpsMonitorManager tpsMonitorManager; /**
* Get Request Handler By request Type.
*
* @param requestType see definitions of sub constants classes of RequestTypeConstants
* @return request handler.
*/
public RequestHandler getByRequestType(String requestType) {
return registryHandlers.get(requestType);
} /**
* The main function of this listener is to load com.alibaba.nacos.core.remote.RequestHandler Subclasses of to registryHandlers,
* It is used for subsequent request processing , It can be seen as an embodiment of the strategic model
*
* @param event event
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
Collection<RequestHandler> values = beansOfType.values();
for (RequestHandler requestHandler : values) { Class<?> clazz = requestHandler.getClass();
boolean skip = false;
while (!clazz.getSuperclass().equals(RequestHandler.class)) {
if (clazz.getSuperclass().equals(Object.class)) {
skip = true;
break;
}
clazz = clazz.getSuperclass();
}
if (skip) {
continue;
} try {
Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
// need TPS The monitored classes are added to tpsMonitorManager Collection
if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
TpsControl tpsControl = method.getAnnotation(TpsControl.class);
String pointName = tpsControl.pointName();
TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
}
} catch (Exception e) {
//ignore.
}
Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
// Add processors to the collection
registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
}
}
}

We can see com.alibaba.nacos.core.remote.RequestHandler There are many implementation classes for , We can roughly see the role of each category from its name

com.alibaba.nacos.core.remote.RequestHandler Subclass name effect
com.alibaba.nacos.config.server.remote.ConfigChangeBatchListenRequestHandler The nodes are configured with processors that are synchronized with each other
com.alibaba.nacos.config.server.remote.ConfigChangeBatchListenRequestHandler Configuration change listening processor
com.alibaba.nacos.config.server.remote.ConfigPublishRequestHandler Configure the publishing listening processor
com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler Configure the query request processor
com.alibaba.nacos.config.server.remote.ConfigRemoveRequestHandler Configure the remove request processor
com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandlerdistro Consistency service processor ( Node synchronization data )
com.alibaba.nacos.core.remote.HealthCheckRequestHandler Health check processor
com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler Instance registration , Remove the processor
com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler Service information loading processor
com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler Service list request processor
com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler Service query processor

Service Registration process

io.grpc.stub.StreamObserver#onNext One of the Acceptor, Used to listen to the client GRpc Connect , When there is a client connection , Would pass connectionManager.register(connectionId, connection) Registration instance , Then the connection event will be published through the client registration connector clientConnectionEventListenerRegistry.notifyClientConnected(connection); Then the listening event will implement the specific logic of establishing the connection , The registration logic will not be executed until the creation is completed .

com.alibaba.nacos.core.remote.ClientConnectionEventListenerRegistry: Client connection Naocs Event registrar 
Currently known registers inherit com.alibaba.nacos.core.remote.ClientConnectionEventListener
// Code is currently empty , It may be used for future extension
com.alibaba.nacos.config.server.remote.ConfigConnectionEventListener
// Used to manage client connections , You can connect , disconnect , Verify whether the connection is valid , There is a thread pool inside it to periodically clear invalid connections
com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager
//grpc Callback initialization and cleaning up listeners
com.alibaba.nacos.core.remote.core.RpcAckCallbackInitorOrCleaner

Let's use com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler The service registration request analyzes the processing flow of this class .

// Define the processing flow of the request 
public abstract class RequestHandler<T extends Request, S extends Response> {
@Autowired
private RequestFilters requestFilters; public Response handleRequest(T request, RequestMeta meta) throws NacosException {
for (AbstractRequestFilter filter : requestFilters.filters) {
try {
Response filterResult = filter.filter(request, meta, this.getClass());
if (filterResult != null && !filterResult.isSuccess()) {
return filterResult;
}
} catch (Throwable throwable) {
Loggers.REMOTE.error("filter error", throwable);
} }
return handle(request, meta);
} public abstract S handle(T request, RequestMeta meta) throws NacosException;
}
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> { private final EphemeralClientOperationServiceImpl clientOperationService; public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
this.clientOperationService = clientOperationService;
} @Override
@Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
} /**
* Entrusted to com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#clientOperationService To register an instance
*
* @param service service
* @param request request
* @param meta meta
* @return com.alibaba.nacos.api.naming.remote.response.InstanceResponse
*/
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
} private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
} }

You can see InstanceRequestHandler Inherited RequestHandler, Parent in handleRequest The request processing flow is defined , Finally, the specific processing logic is handed over to subclasses to implement , This is the implementation of a typical template design pattern , You can see that the subclasses are based on request.getType() The specific processing components are divided into registered instances and unregistered instances , And then entrusted to com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl To handle the logic of registering and unregistering specific instances .

We all know Nacos Examples of are divided into Ephemeral and Persistent Two examples , The default is Ephemeral, Register directly here as EphemeralClientOperationServiceImpl Of Bean Instead of adopting ClientOperationServiceProxy Way of agency , the reason being that Persistent The example of is that the processing logic is not here .

After so many steps , Finally, we have reached the real process of registering instances

    /**
* Registration instance
*
* @param service service
* @param instance instance
* @param clientId connectionId
*/
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
// Access to services , If it already exists , Replace the old one Service(namespace,group,name)
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
// obtain client
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
// Create an instance
InstancePublishInfo instanceInfo = getPublishInfo(instance);
// hold Service and instanceInfo Cache to the connected client , Then publish the client change event
client.addServiceInstance(singleton, instanceInfo);
// Update the last update time
client.setLastUpdatedTime();
// Publish the registration service event
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
// Publish metadata update Events (matadataId=>ip:port:clusterName)
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

You can see that the registration process includes obtaining and replacing old services , Create a new one if it doesn't exist , And then according to ClientId Get the corresponding Client, Then create a InstanceInfo, add to Service and InstanceInfo To ClientManager Inside , Finally, two events were released . so what ? It's over ? Where is the data stored ? Where is the registration ? At first , I also have this series of questions , I don't know where the data is stored , Later, through the interface requested according to the console interface /nacos/v1/ns/catalog/services It is found that the data of this interface is from a file called ServiceStorage I read it from the inside of , Then find the way of thinking through the answers It is found that after a series of operations after the event is published, data storage operations are performed in the execution engine .

Nacos data storage

ServiceStorage.java

/**
* Service storage.
*
* @author xiweng.yy
*/
@Component
public class ServiceStorage { /**
* The customer single connection registration service index focuses on
*/
private final ClientServiceIndexesManager serviceIndexesManager; private final ClientManager clientManager; private final SwitchDomain switchDomain; private final NamingMetadataManager metadataManager; /**
* Service information
*/
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes; /**
* Cluster index management key:value=>Service:Set(ClusterName)
*/
private final ConcurrentMap<Service, Set<String>> serviceClusterIndex; public ServiceStorage(ClientServiceIndexesManager serviceIndexesManager, ClientManagerDelegate clientManager,
SwitchDomain switchDomain, NamingMetadataManager metadataManager) {
this.serviceIndexesManager = serviceIndexesManager;
this.clientManager = clientManager;
this.switchDomain = switchDomain;
this.metadataManager = metadataManager;
this.serviceDataIndexes = new ConcurrentHashMap<>();
this.serviceClusterIndex = new ConcurrentHashMap<>();
} /**
* Get the cluster information under the current service
*
* @param service service
* @return java.util.Set
*/
public Set<String> getClusters(Service service) {
return serviceClusterIndex.getOrDefault(service, new HashSet<>());
} /**
* Get the data information of the service
*
* @param service service
* @return com.alibaba.nacos.api.naming.pojo.ServiceInfo
*/
public ServiceInfo getData(Service service) {
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
} /**
* if com.alibaba.nacos.naming.core.v2.ServiceManager non-existent , Then return directly , Update the current if it already exists Service Under the Cluster and Instance Information
*
* @param service service
* @return com.alibaba.nacos.api.naming.pojo.ServiceInfo
*/
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
//ServiceManager Does not include direct return , Otherwise update Service
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
// to update Service New cluster information under
result.setHosts(getAllInstancesFromIndex(service));
// Update the instance information under the service
serviceDataIndexes.put(service, result);
return result;
} public void removeData(Service service) {
serviceDataIndexes.remove(service);
serviceClusterIndex.remove(service);
} private ServiceInfo emptyServiceInfo(Service service) {
ServiceInfo result = new ServiceInfo();
result.setName(service.getName());
result.setGroupName(service.getGroup());
result.setLastRefTime(System.currentTimeMillis());
result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
return result;
} /**
* Get current Service All under Instance Information , And update the current Service Cluster information under
*
* @param service service
* @return java.util.List
*/
private List<Instance> getAllInstancesFromIndex(Service service) {
Set<Instance> result = new HashSet<>();
Set<String> clusters = new HashSet<>();
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
// Get the instance and update the metadata information of the instance
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
return new LinkedList<>(result);
} private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
Client client = clientManager.getClient(clientId);
if (null == client) {
return Optional.empty();
}
return Optional.ofNullable(client.getInstancePublishInfo(service));
} private Instance parseInstance(Service service, InstancePublishInfo instanceInfo) {
Instance result = InstanceUtil.parseToApiInstance(service, instanceInfo);
Optional<InstanceMetadata> metadata = metadataManager
.getInstanceMetadata(service, instanceInfo.getMetadataId());
metadata.ifPresent(instanceMetadata -> InstanceUtil.updateInstanceMetadata(result, instanceMetadata));
return result;
}
}

You can see , The most important one is getData Methods and getPushData Method , and getData Methods are called again getPushData Method ,getPushData Under the update service Service Method getAllInstancesFromIndex Get and update the current Service All cluster information under , such Service All information under is cached to ServiceStorage Inside the .

Nacos Register related event resolution

because Nacos The events of are divided into regular events and slow events , The permission class names are com.alibaba.nacos.common.notify.Event and com.alibaba.nacos.common.notify.SlowEvent, Subscribers and publishers are also divided into multi event publishers ( subscriber ) And single event publishers ( subscriber ), The notification center is com.alibaba.nacos.common.notify.NotifyCenter, I will not elaborate here . Here is a brief introduction to the event types related to the root instance registration, when the event will be triggered, and who listened to the event , See the table below for details .

Full name of the event The effect of the event trigger monitor
com.alibaba.nacos.naming.core. v2.event.client. ClientOperationEvent. ClientRegisterServiceEvent The client registers an instance event 1: When a client initiates a request to register an instance 2: The consistency protocol proactively notifies the client to update the status com.alibaba.nacos.naming.core.v2.index. ClientServiceIndexesManager #handleClientOperation
com.alibaba.nacos.naming.core.v2. event.service.ServiceEvent. ServiceChangedEvent Instance change event 1: When the client registers an instance 2 When the client removes a registered instance 3: When the client updates the instance metadata 4: Client heartbeat processing ( This event is only published when the instance is in an unhealthy state )5: Unhealthy instance detection 1:com.alibaba.nacos.naming.core.v2. upgrade.doublewrite.delay. DoubleWriteEventListene r#onEvent;2:com.alibaba.nacos.naming.push. v2.NamingSubscriberServiceV2Impl#onEvent

Nacos Execution engine

After the departure of the previous event , After a series of logic, you will finally come to the execution engine , The execution engine performs tasks , The execution engine here ( Double write and deferred push tasks ) Two are involved com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine、com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine

First take a look at the class diagram of the execution engine , It can be found that the two execution engines inherit com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine Their parents are exactly the same , It just redefines its own execution logic .

Double write execution engine

@Component
public class DoubleWriteDelayTaskEngine extends NacosDelayTaskExecuteEngine { public DoubleWriteDelayTaskEngine() {
// Execution engine name and log printer
super(DoubleWriteDelayTaskEngine.class.getSimpleName(), Loggers.SRV_LOG);
// add to v1 Version of the task processor
addProcessor("v1", new ServiceChangeV1Task.ServiceChangeV1TaskProcessor());
// add to v2 Version of the task processor
addProcessor("v2", new ServiceChangeV2Task.ServiceChangeV2TaskProcessor());
} @Override
public NacosTaskProcessor getProcessor(Object key) {
String actualKey = key.toString().split(":")[0];
return super.getProcessor(actualKey);
}
}

According to the constructor, we can see that the double write execution engine adds v1 and v2 Two task processors , The purpose is to ensure the smooth upgrade of the version , When our cluster has been upgraded and is in a stable state, we can turn off double write , This is in Nacos It is also mentioned in the upgrade document of (Nacos 2.0 Upgrade documents ).

public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
/**
* Client management
*/
private final ClientManager clientManager;
/**
* Client service manager
*/
private final ClientServiceIndexesManager indexesManager;
/**
* data storage
*/
private final ServiceStorage serviceStorage;
/**
* Metadata management
*/
private final NamingMetadataManager metadataManager;
/**
* actuator
*/
private final PushExecutor pushExecutor; private final SwitchDomain switchDomain; public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
PushExecutor pushExecutor, SwitchDomain switchDomain) {
super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
this.clientManager = clientManager;
this.indexesManager = indexesManager;
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
this.pushExecutor = pushExecutor;
this.switchDomain = switchDomain;
// Customize the default task processor
setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
} public ClientManager getClientManager() {
return clientManager;
} public ClientServiceIndexesManager getIndexesManager() {
return indexesManager;
} public ServiceStorage getServiceStorage() {
return serviceStorage;
} public NamingMetadataManager getMetadataManager() {
return metadataManager;
} public PushExecutor getPushExecutor() {
return pushExecutor;
} @Override
protected void processTasks() {
if (!switchDomain.isPushEnabled()) {
return;
}
super.processTasks();
} /**
* Customize the default processor
*/
private static class PushDelayTaskProcessor implements NacosTaskProcessor { private final PushDelayTaskExecuteEngine executeEngine; public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
this.executeEngine = executeEngine;
} @Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
// task assignment
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
}

Nacos Mission dispatcher

public class NamingExecuteTaskDispatcher {

    private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();

    private final NacosExecuteTaskExecuteEngine executeEngine;

    private NamingExecuteTaskDispatcher() {
//Nacos Task execution engine
executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);
} public static NamingExecuteTaskDispatcher getInstance() {
return INSTANCE;
} /**
* Add tasks to the execution engine
*
* @param dispatchTag according to dispatchTag Decide who to assign the task to
* @param task Mission
*/
public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {
executeEngine.addTask(dispatchTag, task);
} public String workersStatus() {
return executeEngine.workersStatus();
}
}

You can see that the task is added to Nacos In the task queue , The unification was handed over to Nacos The task execution engine executes tasks .

public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {

    private final TaskExecuteWorker[] executeWorkers;

    public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
this(name, logger, ThreadUtils.getSuitableThreadCount(1));
} public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
super(logger);
executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
}
} @Override
public int size() {
int result = 0;
for (TaskExecuteWorker each : executeWorkers) {
result += each.pendingTaskCount();
}
return result;
} @Override
public boolean isEmpty() {
return 0 == size();
} @Override
public void addTask(Object tag, AbstractExecuteTask task) {
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
} private TaskExecuteWorker getWorker(Object tag) {
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return executeWorkers[idx];
} private int workersCount() {
return executeWorkers.length;
} @Override
public AbstractExecuteTask removeTask(Object key) {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
} @Override
public Collection<Object> getAllTaskKeys() {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
} @Override
public void shutdown() throws NacosException {
for (TaskExecuteWorker each : executeWorkers) {
each.shutdown();
}
} /**
* Get workers status.
*
* @return workers status string
*/
public String workersStatus() {
StringBuilder sb = new StringBuilder();
for (TaskExecuteWorker worker : executeWorkers) {
sb.append(worker.status()).append('\n');
}
return sb.toString();
}
}

NacosExecuteTaskExecuteEngine.java

public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {

    private final TaskExecuteWorker[] executeWorkers;

    public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
// According to the computer CPU Count the number of threads , Greater than or equal to CPU Check the number *threadMultiple The smallest pow(2,n) The positive integer
this(name, logger, ThreadUtils.getSuitableThreadCount(1));
} public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
super(logger);
executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
}
} @Override
public int size() {
int result = 0;
for (TaskExecuteWorker each : executeWorkers) {
result += each.pendingTaskCount();
}
return result;
} @Override
public boolean isEmpty() {
return 0 == size();
} @Override
public void addTask(Object tag, AbstractExecuteTask task) {
NacosTaskProcessor processor = getProcessor(tag);
// If there is a custom processor , Then use the custom processor to execute the task
if (null != processor) {
processor.process(task);
return;
}
// Get the to perform the task worker
TaskExecuteWorker worker = getWorker(tag);
// Perform tasks
worker.process(task);
} /**
* according to tag Judge which one worker To perform the task
*
* @param tag tag
* @return worker
*/
private TaskExecuteWorker getWorker(Object tag) {
// Guaranteed idx by 0~workersCount Number of numbers
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return executeWorkers[idx];
} private int workersCount() {
return executeWorkers.length;
} @Override
public AbstractExecuteTask removeTask(Object key) {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
} @Override
public Collection<Object> getAllTaskKeys() {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
} @Override
public void shutdown() throws NacosException {
for (TaskExecuteWorker each : executeWorkers) {
each.shutdown();
}
} /**
* Get workers status.
*
* @return workers status string
*/
public String workersStatus() {
StringBuilder sb = new StringBuilder();
for (TaskExecuteWorker worker : executeWorkers) {
sb.append(worker.status()).append('\n');
}
return sb.toString();
}
}

You can see that the main function of this class is based on CPU The number of cores and threads , Then get the corresponding worker Perform tasks , stay addTask Method worker, Finally, the mission worker.process(task); When executing a task, we can naturally think of a thread inside , Then the loop gets the task from the blocking queue to execute the task .

TaskExecuteWorker.java

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {

    /**
* Max task queue size 32768.
*/
private static final int QUEUE_CAPACITY = 1 << 15; private final Logger log; private final String name;
/**
* Blocking queues
*/
private final BlockingQueue<Runnable> queue; private final AtomicBoolean closed; public TaskExecuteWorker(final String name, final int mod, final int total) {
this(name, mod, total, null);
} public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
// Open thread
new InnerWorker(name).start();
} public String getName() {
return name;
} /**
* Put the task in the queue
*
* @param task task
* @return Whether the blocking queue is successfully placed
*/
@Override
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
putTask((Runnable) task);
}
return true;
} private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
} public int pendingTaskCount() {
return queue.size();
} /**
* Worker status.
*/
public String status() {
return name + ", pending tasks: " + pendingTaskCount();
} @Override
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
} /**
* Task executor , Always loop to get the task from the blocking queue and then execute
*/
private class InnerWorker extends Thread { InnerWorker(String name) {
setDaemon(false);
setName(name);
} @Override
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
}

summary

This article describes Nacos1.X and 2.X And 2.X be relative to 1.X The advantages of , And from the aspect of source code, it analyzes why Nacos2.X The advantages of . You can learn from this chapter Nacos The design of the , It is also very helpful for us to code our own code , You can see that a lot of design patterns are used ( Templates , The observer , entrust , agent , Single case , factory , Strategy ), It is also through events ( Observer mode ) To decouple , Use asynchronous programming to improve the corresponding performance of the program as much as possible , I believe that reading the source code carefully will greatly improve our future code design and multi-threaded programming .

Nacos2.X Source code reading summary of more related articles

  1. 【 primary 】FMDB Source code reading ( 3、 ... and )

    [ primary ]FMDB Source code reading ( 3、 ... and ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface FMDB The better part is the processing of multithreading . So this article is mainly about FMDB The implementation of multithread processing . and ...

  2. 【 primary 】FMDB Source code reading ( Two )

    [ primary ]FMDB Source code reading ( Two ) Please indicate the source of this article -- polobymulberry- Blog Garden 1. Preface The last one was just a brief passing FMDB The basic flow of a simple example , There is no reference to FMDB All aspects of , Than ...

  3. 【 primary 】FMDB Source code reading ( One )

    [ primary ]FMDB Source code reading ( One ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface Tell the truth , Previous SDWebImage and AFNetworking I still use these two components , But for ...

  4. 【 primary 】AFNetworking Source code reading ( 6、 ... and )

    [ primary ]AFNetworking Source code reading ( 6、 ... and ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface What I want to talk about in this article is , One is to analyze AFSecurityPolicy file , have a look AF ...

  5. 【 primary 】AFNetworking Source code reading ( 5、 ... and )

    [ primary ]AFNetworking Source code reading ( 5、 ... and ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface The last one mentioned Multipart Request How to build it - [AFHTTP ...

  6. 【 primary 】AFNetworking Source code reading ( Four )

    [ primary ]AFNetworking Source code reading ( Four ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface There are still many problems left in the last article , Include AFURLSessionManagerTaskDe ...

  7. 【 primary 】AFNetworking Source code reading ( 3、 ... and )

    [ primary ]AFNetworking Source code reading ( 3、 ... and ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface In the last article , It mainly talks about how to build a request To generate a data tas ...

  8. 【 primary 】AFNetworking Source code reading ( Two )

    [ primary ]AFNetworking Source code reading ( Two ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface In the last article we were in iOS Example The code mentions AFHTTPSessionMa ...

  9. 【 primary 】AFNetworking Source code reading ( One )

    [ primary ]AFNetworking Source code reading ( One ) Please indicate the source of this article —— polobymulberry- Blog Garden 1. Preface AFNetworking edition :3.0.4 Because I don't use it often AFNetw ...

  10. [PHP Source code reading ]explode and implode function

    explode and implode Function is mainly used to convert between strings and arrays , For example, after obtaining a parameter, the string is divided according to a character , Or combine the results of an array with one character into a string to output . stay PHP These two functions are often used in , therefore ...

Random recommendation

  1. TypeScript The first object-oriented experience

    1. install nodejs and vscode: nodejs : https://nodejs.org/en/ Visual Studio Code :  https://www.visualstudio.co ...

  2. iOS Development adapter iOS10 as well as Xcode8-b

    Now it's on Apple's website , We can already download it to Xcode8 Of GM Version of the , add 9.14 Early morning , Apple is about to officially launch iOS10 The push of the system , On this occasion ,iOS10 It's very urgent for us to adapt , I do not know! Xcode8 beat edition , ...

  3. poj 3635 Full Tank? ( bfs+dp thought )

    Full Tank? Time Limit: 1000MS   Memory Limit: 65536K Total Submissions: 5857   Accepted: 1920 Descri ...

  4. oracle Import to Excel

    One . from oracle Data everywhere excel File method one . Directly from PL/SQL in , Select all data , Then copy and paste it into excel in : Method 2 . Also through PL/SQL, In the data column , Click on the right --> The results derived --> choice ...

  5. Dynamic programming ——C Edit the shortest distance

    C -  Edit distance The time limit : 1000 ma'am   Memory limit : 65536KB 64 Bit input / output format : %I64d & %I64u Submit   state describe Let x and y be two strings ...

  6. BestCoder Round #57 (div.2)

    The first scene BC... I think we should take part in more competitions ... The first question is "water". You can just mess around with it The second question is still water .. Just remember the prefix and .. Although it is 2 Tao Shui .. But I T1 It was too late to submit 20min, T2 also RE A hair .. The second submission is ...

  7. Summary php In several kinds of web page Jump

    1. Using web pages <a href=.....></a> Realize jump : 2.<form action="php_request2.php" method=&qu ...

  8. of apk The packing is being sorted out recently

    Next week, we will present a complete set of packaging process . {‘ Coming soon ’,}

  9. github git Unable to read remote warehouse or no permission

    resolvent : To reset ssh secret key ssh-keygen -t rsa -C "http://github.com"// Enter the command and press the prompt to enter id_rsa.pub Storage address And key password The earth ...

  10. jmeter controller ( Two )

    Circulation controller : Just as the name suggests, it is used for cycle control , Same as the loop of thread group , However, the loop controller here is used in a separate module , The loops in the thread group work globally . The number of cycles set in the cycle controller is locally valid , Only control what is within your scope ...