当前位置:网站首页>[Elastic-Job source code analysis] - job listener
[Elastic-Job source code analysis] - job listener
2022-07-31 05:47:00 【ham programming】
概述
参考文档:芋道源码-Elastic Job Lite
下载源码地址:Elastic-Job源码地址
Elastic-Job官网地址:【Elastic-Job官网文档地址】
Elastic-Job代码版本:3.0.0-RC1-SNAPSHOT
内容
作业监听器uml图【插播idea的工具[Diagrams],It is very helpful to analyze the source code,建议使用】
1. ElasticJobListener,Each job node executes,aka local listener 2. AbstractDistributeOnceElasticJobListener实现了ElasticJobListener,重写beforeJobExecuted和afterJobExecuted,Only a single node executes in a distributed scenario 3. AbstractDistributeOnceElasticJobListener依赖GuaranteeService类对zk节点的操作,Implement single node operation,They are combinatorial relationships 4. AbstractListenerManageris the listener management interface,Defines a method for adding listener classes 5. GuaranteeListenerManager实现了AbstractListenerManager,It is instantiated during initializationStartedNodeRemovedJobListener和CompletedNodeRemovedJobListener,Two classes listen for node changes,达到控制AbstractDistributeOnceElasticJobListenerWake-up of pre-listening and post-listening instances
ElasticJobListener
There are mainly two methods in the source code:beforeJobExecuted和afterJobExecuted,If the job processes files from the job server,处理完成后删除文件,可考虑使用每个节点均执行清理任务.This type of task is simple to implement and does not need to consider whether the globally distributed task is completed,Please try to use this type of listenerpublic interface ElasticJobListener { /** * 作业执行前的执行的方法. * * @param shardingContexts 分片上下文 */ void beforeJobExecuted(final ShardingContexts shardingContexts); /** * 作业执行后的执行的方法. * * @param shardingContexts 分片上下文 */ void afterJobExecuted(final ShardingContexts shardingContexts);}
AbstractDistributeOnceElasticJobListener
该接口实现ElasticJobListener,主要是重写了beforeJobExecuted和afterJobExecuted方法,Only let one of the multiple nodes execute.Depends on this implementationGuaranteeService功能,Implement only single node execution in distributed scenariospublic abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener { /** * Start the timeout period,Avoid deadlocks caused by unsynchronized jobs */ private final long startedTimeoutMilliseconds; /** * Start waiting for the object,Avoid deadlocks caused by unsynchronized jobs */ private final Object startedWait = new Object(); /** * 完成超时时间 */ private final long completedTimeoutMilliseconds; /** * Finish waiting for the object */ private final Object completedWait = new Object(); /** * A service that guarantees the start and end states of all distributed tasks */ @Setter private GuaranteeService guaranteeService; private final TimeService timeService = new TimeService(); public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds; this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds; } @Override public final void beforeJobExecuted(final ShardingContexts shardingContexts) { Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet(); if (shardingItems.isEmpty()) { return; } // The registration job shard entry starts running guaranteeService.registerStart(shardingItems); // Determine if all shard items are running,Not all are running,本线程休眠100ms while (!guaranteeService.isRegisterStartSuccess(shardingItems)) { BlockUtils.waitingShortTime(); } // Determine if all shard items are running,开始线程 if (guaranteeService.isAllStarted()) { // 执行 doBeforeJobExecutedAtLastStarted(shardingContexts); // Clear boot information guaranteeService.clearAllStartedInfo(); return; } long before = timeService.getCurrentMillis(); try { synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) { // Clear boot information guaranteeService.clearAllStartedInfo(); // 抛出异常信息 handleTimeout(startedTimeoutMilliseconds); } } @Override public final void afterJobExecuted(final ShardingContexts shardingContexts) { Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet(); if (shardingItems.isEmpty()) { return; } guaranteeService.registerComplete(shardingItems); while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) { BlockUtils.waitingShortTime(); } if (guaranteeService.isAllCompleted()) { doAfterJobExecutedAtLastCompleted(shardingContexts); guaranteeService.clearAllCompletedInfo(); return; } long before = timeService.getCurrentMillis(); try { synchronized (completedWait) { completedWait.wait(completedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) { guaranteeService.clearAllCompletedInfo(); handleTimeout(completedTimeoutMilliseconds); } } private void handleTimeout(final long timeoutMilliseconds) { throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds); } /** * Do before job executed at last sharding job started. * * @param shardingContexts sharding contexts */ public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts); /** * Do after job executed at last sharding job completed. * * @param shardingContexts sharding contexts */ public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts); /** * Notify waiting task start. */ public void notifyWaitingTaskStart() { synchronized (startedWait) { startedWait.notifyAll(); } } /** * Notify waiting task complete. */ public void notifyWaitingTaskComplete() { synchronized (completedWait) { completedWait.notifyAll(); } }
}
```
从代码中可以看出,主要是重写了beforeJobExecuted和afterJobExecuted,The implementation principle of the two methods is the same,我们单从beforeJobExecuted分析执行步骤
- zk节点注册,The registered directory is
/${job_name}/guarantee/started/${节点}
,Registered nodes are permanent nodes - Determine whether all nodes have registered functions:zk节点
/${job_name}/guarantee/started/
Whether the number of child nodes is equal to the number of shards - if【相等】Start executing the preceding method,and clear the node path,此时StartedNodeRemovedJobListener监听到节点变化,Wake up all object instances
- if【不相等】By using object locks,调用waitThe method first determines whether the object has timed out,If not timed out, the node path will be cleared,此时StartedNodeRemovedJobListener监听到节点变化,Wake up all object instances;并抛出异常;
-若未超时,This node sleeps,and release the object lock,Other nodes acquire object locks,Re-judg whether all nodes have been registered successfully
Summarize the above flow chart as:
总结
The listener is temporarily explained here,If there are problems we can continue to discuss,欢迎指教!
边栏推荐
猜你喜欢
leetcode-每日一题873. 最长的斐波那契子序列的长度(哈希和二分)
NFT:数字所有权的核心
【C语言3个基本结构详解——顺序、选择、循环】
The interviewer asked me how to divide the database and the table?Fortunately, I summed up a set of eight-part essays
阿里一面,说说你知道消息中间件的应用场景有哪些?
12 【网页布局总结 元素的显示与隐藏】
【C语言趣味小游戏——猜数字】
DeFi 项目中的治理Token
05 【绑定样式 条件渲染 列表渲染】
uni-app进阶之样式框架/生产环境【day10】
随机推荐
剑指offer基础版 --- 第22天
剑指offer基础版 --- 第21天
Digital twins will be an important way to enter the "metaverse"
gin框架学习-JWT认证
继承、Super,重写、抽象类、抽象方法 1(第七天)
剑指offer基础版 ---- 第29天
vulhub靶场学习日记xxe-lab
leetcode-每日一题1217. 玩筹码(贪心+位运算)
Swordsman Offer Special Assault Edition --- Day 3
tf.keras.utils.get_file()
The TOKEN value of Kubernetes joining the cluster expires
为什么redis是单线程还那么快?
gin框架学习-GORM框架进阶之CRUD接口(数据库增删改查操作)
Swordsman Offer Special Assault Edition ---- Day 6
【C语言趣味小游戏——猜数字】
【windows】--- SQL Server 2008 超详细安装教程
Three-party login using wallet Metamask based on web3.0
字符串的新增方法
07 【内置指令 自定义指令】
uni-app进阶之自定义【day13】