当前位置:网站首页>xxl-job源码解析(技术分享)
xxl-job源码解析(技术分享)
2022-07-30 14:55:00 【segegefe】
文章目录
1. 概念
1. 定时任务的基本概念
程序为解决一个信息处理任务而预先编制的工作执行方案,这就是定时任务,核心组成如下:
- 执行器:负责管理应用运行时环境,用于调度定时任务。
- 任务:任务执行的流程,是一个类,具体的业务。
- 触发器:按照某种时间规则,执行具体的调度任务。
2. 定时任务的使用场景
日常开发中,定时任务主要分为如下两种使用场景:
时间驱动:
- 对账单、日结
- 营销类短信
- MQ定时检查生产失败的消息
数据驱动:
- 异步数据交换
- 数据同步
3. 原生定时任务缺陷有哪些缺陷?
分布式技术应用的时代,原生定时任务的缺陷显得更为突出。结合传统项目与分布式微服务的架构,思考总结如下,欢迎各位大神给与补充:
- 不支持集群多节点部署,需要自己实现避免任务重复执行的问题。
- 不支持分片任务,处理有序数据时,多机器分片执行任务处理不同数据。
- 不支持动态调整,不重启服务的情况下修改任务的参数。
- 没有报警机制,当任务失败后没有报警机制通知。
- 不支持生命周期统一管理,如不重启服务情况下关闭、启动任务。
- 不支持失败重试,出现异常后任务终结,不能根据状态控制任务重新执行。
- 无法统计任务数据,当任务数据量大的时候,对于任务执行情况无法高效的统计执行情况。
4. 基于当前 XXL-JOB 我们能做什么?
- 执行器 HA(分布式):天生支持任务分布式执行,无需自己实现。任务"执行器"支持集群部署,可保证任务执行 HA;
- 调度中心 HA(中心式):调度中心相当于传统调度任务的触发器,调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心 HA;

2. 系统架构和整理流程
https://www.xuxueli.com/xxl-job/
2.1. 设计思想
- 将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
- 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
- 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
2.2. 架构图

2.3. 执行流程

3. 启动流程
3.1. 服务器启动

首先找到配置类 XxlJobAdminConfig, 可以发现该类实现 InitializingBean接口,这里直接看 afterPropertiesSet方法即可。
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
// 初始化xxljob调度器
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
...
}
public void init() throws Exception {
// init i18n
initI18n();
// admin trigger pool start
// 初始化触发器线程池
JobTriggerPoolHelper.toStart();
// admin registry monitor run
/**
* 30秒执行一次,维护注册表信息, 判断在线超时时间90s
* 1. 删除90s未有心跳的执行器节点;jobRegistry
* 2. 获取所有的注册节点,更新到jobGroup(执行器)
*/
JobRegistryHelper.getInstance().start();
// admin fail-monitor run 运行事变监视器,主要失败发送邮箱,重试触发器
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
// 将丢失主机调度日志置为失败
JobCompleteHelper.getInstance().start();
// admin log report start 统计一些失败成功报表
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
/**
* 调度器执行任务(两个线程 + 线程池执行调度逻辑)
* 1. 调度线程50s执行一次;查询5s秒内执行的任务,并按照不同逻辑执行
* 2. 时间轮线程每1秒执行一次;时间轮算法,并向前跨一个时刻;
*/
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
3.2. 客户端启动

这里我们看XxlJobSpringExecutor,实现了 SmartInitializingSingleton 接口,实现该接口的当spring容器初始完成,调用afterSingletonsInstantiated()方法。紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean。实现DisposableBean当实例bean摧毁时调用destroy()方法。
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method) 初始化调度器资源管理器
/**
* ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
* handle名; Handler->MethodJobHandler(反射 Object、Bean、initMethod、destroyMethod)
*/
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start 启动
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
再看super.start()
public void start() throws Exception {
// init logpath 初始化日志目录,用来存储调度日志执行指令到磁盘
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client 初始化admin链接路径存储集合
// 在AdminBizClient设置好addressUrl+accessToken
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread 清除过期日志(30天)
// 根据存储路径目录的日志(目录名为时间),根据其目录时间进行删除,1天跑一次,守护线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread 回调调度中心任务执行状态
TriggerCallbackThread.getInstance().start();
// init executor-server 执行内嵌服务
/**
* 1. 使用netty开放端口,等待服务端调用
* 2. 维护心跳时间到服务端(心跳30S)
* 3. 向服务端申请剔除服务
*/
initEmbedServer(address, ip, port, appname, accessToken);
}
4. 服务注册

1. 任务执行器
com.xxl.job.core.thread.ExecutorRegistryThread#start
public void start(final String appname, final String address){
...
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
// 遍历所有的调度中心
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 休眠30s,每30s执行一次
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
// 线程终止后,主动断开连接
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
...
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
...
}
});
// 设置为守护线程
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
再来看看其RPC调用,采用的是HTTP传输协议,并采用了JSON作为序列化。
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
// 可以再细看 com.xxl.job.core.util.XxlJobRemotingUtil,postBody采用就是Http协议,GsonTool将对象转成JSON。
2. 调度中心
再看看调度中心如何接收任务执行器请求的;
JobApiController就为SpringMVC的Controller,负责接收请求映射
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
// valid
if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
/**
* 1. 更新调度日志状态;
* 2. 当执行器执行成功并且存在有子任务时,触发执行子任务
*/
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
}
// 服务注册
else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
}
// 服务下线
else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
}
public ReturnT<String> registry(RegistryParam registryParam) {
// valid 校验
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute 异步注册
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() { //更新修改时间
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {//说明暂未数据,才新增 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh 空实现
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
5. 主动触发

1. 调度中心
触发地址:com.xxl.job.admin.controller.JobInfoController#triggerJob
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
// force cover job param 设置默认值
if (executorParam == null) {
executorParam = "";
}
// 触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool 获取线程池
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
// 获取超时次数
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 一分钟内超时10次,则采用慢触发器执行
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger // 执行触发器
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map 更新成为下一分钟
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now; // 当达到下一分钟则清除超时任务
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
// 执行时间超过500ms,则记录执行次数
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
注意当触发器在一分钟内超时10次,则采用慢触发器执行
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param 获取阻塞处理策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
// 获取路由策略,默认first
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、save log-id 保存执行日志
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address 获取触发器执行地址
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
// 路由策略为分配广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 根据设置的路由策略,执行路由器,获取返回结果,这里用到了策略模式
/**
* 1. ExecutorRouteFirst (第一个)固定选择第一个机器
* 2. ExecutorRouteLast (最后一个)
* 3. ExecutorRouteRound (轮询), 通过Map记录任务的执行次数进行取模
* 4. ExecutorRouteRandom (随机)
* 5. ExecutorRouteConsistentHash (一致性hash),每个jobId都会hash到指定的机器上,每次都会构建虚拟节点
* 6. ExecutorRouteLFU (最不频繁使用,1天的使用频繁), 通过Map存储每个jobId在每个地址的使用次数,拿到最少使用地址;
* 7. ExecutorRouteLRU (最近最久未使用), 通过LinkedHashMap accessOrder进行实现,其内部通过双向链表实现
* 8. ExecutorRouteFailover(故障转移) 通过顺序遍历执行器地址,进行心跳检查
* 9. ExecutorRouteBusyover(忙碌转移) 照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
*/
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
// 已经获取到任务执行器地址,通过HTTP进行调度
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style="color:#00c0ef;" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
2. 任务执行器
com.xxl.job.core.server.EmbedServer#process()
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
// 触发执行器
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
JobThread执行调度逻辑
com.xxl.job.core.thread.JobThread#start()
@Override
public void run() {
// init
try {
// 执行初始化方法(初始化连接池等信息,一个job只能执行一次)
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
// 从队列中获取调度日志
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log" 写入log文件
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
// 设置了超时就异步线程处理(FutureTask设置超时时间)
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
XxlJobHelper.log(e);
// handle result
XxlJobHelper.handleTimeout("job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// just execute
// 反射,invoke handler; 没设置超时时间,则立刻执行触发器
handler.execute();
}
// 记录执行日志
// valid execute handle data
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("job handle result lost.");
} else {
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
?tempHandleMsg.substring(0, 50000).concat("...")
:tempHandleMsg;
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
}
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
+ XxlJobContext.getXxlJobContext().getHandleCode()
+ ", handleMsg = "
+ XxlJobContext.getXxlJobContext().getHandleMsg()
);
} else {
// 空闲执行次数超过30次,且队列没任务,则删除并终止线程
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
}
/**
* 当任务调度有异常时,捕捉异常,通过XxlJobHelper.handleFail(errorMsg)设置失败;
* 所以当JobHandler处理业务逻辑时,记得抛出异常
*/
catch (Throwable e) {
if (toStop) {
XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
// handle result
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
XxlJobHelper.handleFail(errorMsg);
XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
// 添加回调队列
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
6. 自动触发


1. 自动触发逻辑
com.xxl.job.admin.core.thread.JobScheduleHelper#start()
scheduleThread定时线程
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// 保证5秒执行一次
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 每秒处理20个任务,200个线程
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//获取任务调度锁表内数据信息,加写锁(分布式锁)
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
//获取当前时间后5秒,同时最多负载的分页数
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump(时间轮)
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
/**
* 调度过期策略:
* - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
* - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
*/
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next 更新下次执行时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
// 执行触发器
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next 更新下次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again 下次触发时间在当前时间往后5秒范围内
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second 获取下次执行秒
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring 添加到时间轮
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next 更新下次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 未来五秒以内执行的所有任务添加到ringData
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring 添加到时间轮
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next 更新下次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info 更新执行时间和上次执行时间到数据库
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
2. 时间轮线程
com.xxl.job.admin.core.thread.JobScheduleHelper#start()
ringThread时间轮线程
// ring thread(时间轮线程)
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
// 执行触发器;逻辑就跟主动触发是一致的了。
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
7. 设计亮点
1. 路由策略
- 路由策略使用了 策略设计模式,根据选择的策略去获取对应的调度中心地址;
- 支持了首个、最后、随机、轮询、一致性hash、LRU、LFU、故障转移、忙碌转移、分配广播;
2. 注册中心
- 续期线程每30秒对任务执行器进行续期
- 过期线程每30秒把90未续期的任务执行器移除;
3. 全异步化 & 轻量级
1. 调度中心
- 调度任务:线程定时获取要执行的任务,并交给调度线程池异步调用;
- 心跳: 新开线程清理过期的任务执行器;
- 失败任务:线程重试并告警;
2. 任务执行器
- 执行任务: 每个job任务都有各自jobThread从队列中获取;
- 回调: 有两个线程 回调和重试线程,负责向xxlAdmin回调任务执行状态;
- 心跳: 新开线程每隔30s进行续期
3. 异步化
- 异步调用:交给线程池进行异步调用任务给任务执行器
- 异步执行:任务执行器每个job都有各自的线程,并异步回调给xxlAdmin;
4. 轻量级
- 架构上非常的轻,基本通过Mysql实现了分布式锁、注册中心、任务调度等功能,只需依赖Mysql + Java;
- 在全异步化的基础上,单个JOB一次运行平均耗时基本在 “10ms” 之内(基本为一次请求的网络开销),可以保证使用有限的线程支撑大量的JOB并发运行;
- 官方文档表示,在理论的调度中心下,单机能支撑5000任务并发;
- 如何提高性能:1. 机器上;2. 不同业务进行区分; 3. 修改源码(不同的xxl-job集群处理不同的job)
4. 时间轮算法
1. 是什么
- 时间轮方案将现实生活中的时钟概念引入到软件设计中,主要思路是定义一个时钟周期(比如时钟的12小时)和步长(比如时钟的一秒走一次),当指针每走一步的时候,会获取当前时钟刻度上挂载的任务并执行。
- 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
- 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
- 每个槽对应一个环形链表存储该时间应该被执行的任务
- 需要一个线程去驱动指针运转,获取到期任务
2. xxl-job实现
- xxl-job的时间环只会存储之后5s内执行的任务,使用一个Map<Interger, List>进行存储;
- Map的key为执行时间的秒数%60,value为这个秒执行的jobIdList;
- 时间轮线程每1秒执行一次,从时间轮从获取到jobIdList,最后进行调度任务;
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦
边栏推荐
- 华为云重磅发布开源软件治理服务——软件成分分析
- (Crypto必备干货)详细分析目前NFT的几大交易市场
- Lock wait timeout exceeded solution
- Excel uses Visual Basic Editor to modify macros
- How to split microservices?
- TiDB 工具适用场景
- Fink异步IO的实战(关联维表)
- 71-page comprehensive overall solution for global tourism 2021 ppt
- Example of video switching playback (video switching example) code
- B+树索引页大小是如何确定的?
猜你喜欢

How is the B+ tree index page size determined?

70行代码撸一个桌面自动翻译神器

2022最新 | 室外单目深度估计研究综述

【重磅来袭】教你如何在RGBD三维重建中获取高质量模型纹理

类和对象(下篇)

定时任务 corn
4 senior experts share the insider architecture design and implementation principles of Flink technology with years of experience in large factories

(Crypto essential dry goods) Detailed analysis of the current NFT trading markets

Sentinel
![[Cloud native] Alibaba Cloud ARMS business real-time monitoring](/img/e7/55f560196521d22f830b2caf110e34.png)
[Cloud native] Alibaba Cloud ARMS business real-time monitoring
随机推荐
EST综述:eDNA的多种状态以及在水环境中持久性的认知
This editor actually claims to be as fast as lightning!
timed task corn
GeoServer + openlayers
调试 - 笔记
QIIME2得到PICRUSt2结果后如何分析
JHM:芳环羟化双加氧酶数据库DARHD建立及相关引物评价
How is the B+ tree index page size determined?
The use and principle of distributed current limiting reduction RRateLimiter
tiup install
类和对象(下篇)
【重磅来袭】教你如何在RGBD三维重建中获取高质量模型纹理
Mac 中 MySQL 的安装与卸载
关于MySQL主从复制的数据同步延迟问题
ISELED---氛围灯方案的新选择
tiup clean
R中按照数字大小进行排序
Excel使用Visual Basic Editor对宏进行修改
golang modules initialization project
SEATA分布式事务