当前位置:网站首页>【云原生】Nacos-TaskManager 任务管理的使用
【云原生】Nacos-TaskManager 任务管理的使用
2022-07-05 12:38:00 【石臻臻的杂货铺】
任务管理类
因为Nacos中有很多地方使用了这个TaskManager,所以我们得先了解一下这个类是干啥用的,方便后面阅读源码时候不会吃力;
先说结论:
TaskManager 可以看成是一个待执行的任务集合,用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理; 如果执行失败了,任务会被重新放入集合中等待下一次被消费;
AbstractTask
AbstractTask是个抽象类,所有的需要被执行的任务都继续这个类; 这个类主要提供执行任务所需要的数据和方法;例如
/* 一个任务两次处理的间隔,单位是毫秒*/
private long taskInterval;
/*任务上次被处理的时间,用毫秒表示*/
private long lastProcessTime;
/* TaskManager 判断当前是否需要处理这个Task,子类可以Override这个函数实现自己的逻辑 */
public boolean shouldProcess() {
return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);
}
TaskProcessor任务处理器
TaskProcessor 是任务处理器接口,它有个方法
boolean process(String taskType, AbstractTask task);
用于执行对应的AbstractTask任务类; 不同的任务类型,可以实现自己的执行任务逻辑;
TaskManager任务管理类
TaskManager 是个任务管理类;
它里面有两个属性保存了待消费的任务AbstractTask,和任务执行需要的TaskProcessor;
/**待消费的任务AbstractTask**/
private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>();
/**任务AbstractTask对应的任务执行器TaskProcessor**/
private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =new ConcurrentHashMap<String, TaskProcessor>();
如果taskProcessors中没有找到对应的任务执行器,那么它里面有一个默认执行器会执行
/**默认执行器**/
private TaskProcessor defaultTaskProcessor;
使用用例
Nacos配置中心模块很重要一个功能就是,在初始化的时候以及每隔一段时间就会去数据库中把所有数据Dump到磁盘中;Dump就是一个任务类AbstractTask; 我们上面说过
AbstractTask就是一个信息承载对象,主要给TaskProcessor提供执行所需要的数据;我们看看DumpTask;
DumpTask

DumpTask定义了自己的一些属性; 再看看其他的例如DumpAllTask、DumpAllBetaTask

这两个任务类只定义了TASK_ID
既然有DumpTask任务类,那肯定就有对应的任务处理器类DumpProcessor;
DumpProcessor
DumpProcessor 是DumpTask任务的执行器;执行器中的方法
public boolean process(String taskType, AbstractTask task)
代码太长就不在这里分析了,它里面主要做的操作就是 保存配置文件到本地磁盘中,并缓存md5
详细可以看文章 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中
对应DumpAllTask、DumpAllBetaTask 任务的任务执行器有DumpAllProcessor、DumpAllBetaProcessor
DumpAllTask任务触发执行的地方
上面是DumpAllTask的定义和DumpAllTaskProcessor执行器的定义;定义好了之后是怎么被触发的呢?
DumpService初始化Dump配置信息
这个类就是专门Dump配置信息的服务类;上面提及的DumpAll就是在这里被调用的;我们来看下他主要方法;
@PostConstruct
public void init() {
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
/**在new这个TaskManager类的时候,专门执行任务的一个线程就已经开始启动了,这不过这个时候还没有任务Task添加进去**/
dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
Runnable dumpAll = new Runnable() {
@Override
public void run() {
dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
}
};
/**每10分钟执行一次DumpAll操作**/
TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
}
DumpService在初始化的时候回调用这个init方法;
1.先new了一个DumpAllProcessor执行器;
2.再new 了一个TaskManager任务管理器;在new这个任务管理器的时候,就会启动一个线程专门去执行所有待执行的任务;只不过这个时候还没有添加任务;
3.将这个任务管理器的默认执行器设置为DumpAllProcessor;
4.每十分钟执行一次往TaskManager中添加一个DumpAllTask的任务;一经添加就会被TaskManager中的线程 processingThread 执行process方法;
边栏推荐
- Distributed solution - Comprehensive decryption of distributed task scheduling platform -xxljob
- Kotlin function
- PXE startup configuration and principle
- Differences between IPv6 and IPv4 three departments including the office of network information technology promote IPv6 scale deployment
- CVPR 2022 | 基于稀疏 Transformer 的单步三维目标识别器
- Correct opening method of redis distributed lock
- SAP self-development records user login logs and other information
- struct MySQL
- Learn the garbage collector of JVM -- a brief introduction to Shenandoah collector
- Compilation principle reading notes (1/12)
猜你喜欢

Taobao order interface | order flag remarks, may be the most stable and easy-to-use interface

UNIX socket advanced learning diary -ipv4-ipv6 interoperability

ActiveMQ installation and deployment simple configuration (personal test)

Making and using the cutting tool of TTF font library

Annotation problem and hidden Markov model

Redis highly available slice cluster

C language structure is initialized as a function parameter

Transactions from December 27 to 28, 2021

Understand redis persistence mechanism in one article

Resnet+attention project complete code learning
随机推荐
VoneDAO破解组织发展效能难题
Pytoch monolayer bidirectional_ LSTM implements MNIST and fashionmnist data classification
Language model
UNIX socket advanced learning diary -ipv4-ipv6 interoperability
Average lookup length when hash table lookup fails
GPON other manufacturers' configuration process analysis
Handwriting blocking queue: condition + lock
Kotlin process control and circulation
Oppo Xiaobu launched Obert, a large pre training model, and promoted to the top of kgclue
非技术部门,如何参与 DevOps?
Distributed solution - Comprehensive decryption of distributed task scheduling platform - xxljob scheduling center cluster
HiEngine:可媲美本地的云原生内存数据库引擎
Pytoch through datasets Imagefolder loads datasets directly from files
Learn JVM garbage collection 05 - root node enumeration, security points, and security zones (hotspot)
Redis clean cache
Detailed steps for upgrading window mysql5.5 to 5.7.36
Taobao order interface | order flag remarks, may be the most stable and easy-to-use interface
10 minute fitness method reading notes (5/5)
Kotlin variable
NFT: how to make money with unique assets?