当前位置:网站首页>【Elastic-Job源码分析】——作业监听器
【Elastic-Job源码分析】——作业监听器
2022-07-31 05:10:00 【火腿编程】
概述
参考文档:芋道源码-Elastic Job Lite
下载源码地址:Elastic-Job源码地址
Elastic-Job官网地址:【Elastic-Job官网文档地址】
Elastic-Job代码版本:3.0.0-RC1-SNAPSHOT
内容
作业监听器uml图【插播idea的工具[Diagrams],对分析源码帮助很大,建议使用】
1. ElasticJobListener,每台作业节点均执行,又名本地监听器 2. AbstractDistributeOnceElasticJobListener实现了ElasticJobListener,重写beforeJobExecuted和afterJobExecuted,分布式场景中仅单一节点执行 3. AbstractDistributeOnceElasticJobListener依赖GuaranteeService类对zk节点的操作,实现单一节点操作,他们是组合关系 4. AbstractListenerManager是监听器管理接口,定义了添加监听类的方法 5. GuaranteeListenerManager实现了AbstractListenerManager,初始化的时候实例化了StartedNodeRemovedJobListener和CompletedNodeRemovedJobListener,两个类监听节点的变化,达到控制AbstractDistributeOnceElasticJobListener前置监听和后置监听实例的唤醒
ElasticJobListener
源码中主要是两个方法:beforeJobExecuted和afterJobExecuted,若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器public interface ElasticJobListener { /** * 作业执行前的执行的方法. * * @param shardingContexts 分片上下文 */ void beforeJobExecuted(final ShardingContexts shardingContexts); /** * 作业执行后的执行的方法. * * @param shardingContexts 分片上下文 */ void afterJobExecuted(final ShardingContexts shardingContexts);}
AbstractDistributeOnceElasticJobListener
该接口实现ElasticJobListener,主要是重写了beforeJobExecuted和afterJobExecuted方法,只让多个节点中一个节点去执行。在这个实现中依赖了GuaranteeService功能,实现分布式场景中仅单一节点执行public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener { /** * 开始超时时间,避免作业不同步导致的死锁 */ private final long startedTimeoutMilliseconds; /** * 开始等待对象,避免作业不同步导致的死锁 */ private final Object startedWait = new Object(); /** * 完成超时时间 */ private final long completedTimeoutMilliseconds; /** * 完成等待对象 */ private final Object completedWait = new Object(); /** * 保证分布式任务全部开始和结束状态的服务 */ @Setter private GuaranteeService guaranteeService; private final TimeService timeService = new TimeService(); public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds; this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds; } @Override public final void beforeJobExecuted(final ShardingContexts shardingContexts) { Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet(); if (shardingItems.isEmpty()) { return; } // 注册作业分片项开始运行 guaranteeService.registerStart(shardingItems); // 判断是否所有的分片项开始运行,未全部运行,本线程休眠100ms while (!guaranteeService.isRegisterStartSuccess(shardingItems)) { BlockUtils.waitingShortTime(); } // 判断是否所有的分片项开始运行,开始线程 if (guaranteeService.isAllStarted()) { // 执行 doBeforeJobExecutedAtLastStarted(shardingContexts); // 清理启动信息 guaranteeService.clearAllStartedInfo(); return; } long before = timeService.getCurrentMillis(); try { synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) { // 清理启动信息 guaranteeService.clearAllStartedInfo(); // 抛出异常信息 handleTimeout(startedTimeoutMilliseconds); } } @Override public final void afterJobExecuted(final ShardingContexts shardingContexts) { Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet(); if (shardingItems.isEmpty()) { return; } guaranteeService.registerComplete(shardingItems); while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) { BlockUtils.waitingShortTime(); } if (guaranteeService.isAllCompleted()) { doAfterJobExecutedAtLastCompleted(shardingContexts); guaranteeService.clearAllCompletedInfo(); return; } long before = timeService.getCurrentMillis(); try { synchronized (completedWait) { completedWait.wait(completedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) { guaranteeService.clearAllCompletedInfo(); handleTimeout(completedTimeoutMilliseconds); } } private void handleTimeout(final long timeoutMilliseconds) { throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds); } /** * Do before job executed at last sharding job started. * * @param shardingContexts sharding contexts */ public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts); /** * Do after job executed at last sharding job completed. * * @param shardingContexts sharding contexts */ public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts); /** * Notify waiting task start. */ public void notifyWaitingTaskStart() { synchronized (startedWait) { startedWait.notifyAll(); } } /** * Notify waiting task complete. */ public void notifyWaitingTaskComplete() { synchronized (completedWait) { completedWait.notifyAll(); } }
}
```
从代码中可以看出,主要是重写了beforeJobExecuted和afterJobExecuted,其中两个方法的实现原理均是一样的,我们单从beforeJobExecuted分析执行步骤
- zk节点注册,注册的目录为
/${job_name}/guarantee/started/${节点}
,注册节点为永久节点 - 判断所有的节点是否均已注册功能:zk节点
/${job_name}/guarantee/started/
子节点的数量和分片的数量是否相等 - if【相等】开始执行前置方法,并清除节点路径,此时StartedNodeRemovedJobListener监听到节点变化,将所有对象实例唤醒
- if【不相等】通过使用对象锁,调用wait方法先判断对象的是否超时,若未超时将清除节点路径,此时StartedNodeRemovedJobListener监听到节点变化,将所有对象实例唤醒;并抛出异常;
-若未超时,本节点休眠,并释放了对象锁,其他节点获取对象锁,重新判断所有的节点是否均已注册成功
总结以上的流程图为:
总结
监听器暂时讲解到这里,如果有问题我们可以继续探讨,欢迎指教!
边栏推荐
猜你喜欢
Volatility取证工具使用日记
Memcached :安装
Error: Cannot find module ‘D:\Application\nodejs\node_modules\npm\bin\npm-cli.js‘
Swordsman Offer Special Assault Edition ---- Day 6
leetcode-2321. 拼接数组的最大分数(差分+枚举)
Swordsman Offer Special Assault Edition --- Day 3
leetcode-每日一题735. 行星碰撞(栈模拟)
Object Detection Study Notes
闭包(五)----一个常见的循环
局部变量成员变量、引用类型、this,static(第五天)
随机推荐
leetcode-438. 找到字符串中所有字母异位词(滑动窗口)
uni-app进阶之模版语法与数据绑定【day7】
Redis的初识
C语言教程(三)-if和循环
继承、Super,重写、抽象类、抽象方法 1(第七天)
tf.keras.utils.pad_sequences()
C语言实验五 循环结构程序设计(二)
Simple command of mysql
联盟链的真实场景在哪里
Flask 的初识
变量的解构赋值
剑指offer基础版 ----- 第25天
了解SSRF,这一篇就足够了
C语言文件读、写、定位函数
Flink sink redis writes to Redis
Qt Creator + CMake 运行调试总会自动 build 所有目标
leetcode-每日一题735. 行星碰撞(栈模拟)
Flask-based three-party login process
C语言教程(二)-printf及c自带的数据类型
Object Detection Study Notes