当前位置:网站首页>Quartz多个调度器+线程池模式分别调度任务
Quartz多个调度器+线程池模式分别调度任务
2022-06-09 10:25:00 【LHT-2787】
配置,配置多个调度器及其线程池
import com.wipinfo.central.engine.constant.JobConstant;
import com.wipinfo.central.engine.job.MyJobFactory;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.util.concurrent.ThreadPoolExecutor;
/** * @author echoo华地 * @description 定时任务配置 * @date 2022/5/31 9:06 */
@Configuration
@Slf4j
public class QuartzConfig {
/** * 数据清理任务调度器 */
@Bean(JobConstant.JOB_EXECUTOR_CLEAN)
public Scheduler cleanScheduler(@Qualifier("CleanSchedulerFactoryBean") SchedulerFactoryBean timingSchedulerFactoryBean) {
return timingSchedulerFactoryBean.getScheduler();
}
/** * 虚拟机定时开关机任务调度器 */
@Bean(JobConstant.JOB_EXECUTOR_TIMING)
public Scheduler timingScheduler(@Qualifier("TimingSchedulerFactoryBean") SchedulerFactoryBean timingSchedulerFactoryBean) {
return timingSchedulerFactoryBean.getScheduler();
}
/** * 数据上传任务调度器 */
@Bean(JobConstant.JOB_EXECUTOR_UPLOAD)
public Scheduler uploadScheduler(@Qualifier("UploadSchedulerFactoryBean") SchedulerFactoryBean timingSchedulerFactoryBean) {
return timingSchedulerFactoryBean.getScheduler();
}
/** * 数据统计任务调度器 */
@Bean(JobConstant.JOB_EXECUTOR_CALCULATE)
public Scheduler calScheduler(@Qualifier("CalSchedulerFactoryBean") SchedulerFactoryBean timingSchedulerFactoryBean) {
return timingSchedulerFactoryBean.getScheduler();
}
/** * 虚拟机定时开关机调度器工厂 */
@Bean("TimingSchedulerFactoryBean")
public SchedulerFactoryBean timingSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_TIMING, 5, false, threadPoolExecutor, jobFactory());
}
/** * 虚拟机定时开关机调度器工厂 */
@Bean("UploadSchedulerFactoryBean")
public SchedulerFactoryBean uploadSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_UPLOAD, 5, false, threadPoolExecutor, jobFactory());
}
/** * 虚拟机定时开关机调度器工厂 */
@Bean("CalSchedulerFactoryBean")
public SchedulerFactoryBean calSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_CALCULATE, 5, false, threadPoolExecutor, jobFactory());
}
/** * 虚拟机定时开关机调度器工厂 */
@Bean("CleanSchedulerFactoryBean")
public SchedulerFactoryBean cleanSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_CLEAN, 5, false, threadPoolExecutor, jobFactory());
}
/** * @param name 调度器名称 * @param delay 启动延时 * @param waitForJob 等待任务执行完毕再关闭程序 * @param threadPoolExecutor 调度器线程池 * @return org.springframework.scheduling.quartz.SchedulerFactoryBean * @description 生成调度器工厂 * @author echoo华地 * @date 2022/5/31 9:04 */
private SchedulerFactoryBean createSchedulerFactoryBean(String name, Integer delay, boolean waitForJob, ThreadPoolTaskExecutor threadPoolExecutor, MyJobFactory jobFactory) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
System.out.println("当前 = " + schedulerFactoryBean);
schedulerFactoryBean.setSchedulerName(name); //
schedulerFactoryBean.setStartupDelay(delay); // 延时启动
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(waitForJob); // 等待任务执行完毕再关闭程序
// schedulerFactoryBean.setDataSource(); // 因为 Quartz 默认数据库为 Mysql,因此做自定义磁盘化
schedulerFactoryBean.setJobFactory(jobFactory);
schedulerFactoryBean.setTaskExecutor(threadPoolExecutor);
return schedulerFactoryBean;
}
/** * @param corePoolSize 核心线程 * @param maxPoolSize 最大线程 * @param queueCapacity 队列容量 * @param keepAliveSeconds 闲置时间 * @return org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor * @description 生成线程池 * @author echoo华地 * @date 2022/5/31 8:54 */
private static ThreadPoolTaskExecutor createThreadPoolExecutor(Integer corePoolSize, Integer maxPoolSize, Integer queueCapacity, Integer keepAliveSeconds) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds); // Default is 60
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // AbortPolicy : 默认拒绝策略,抛出异常 DiscardPolicy:不会抛出异常,会丢掉任务
threadPoolTaskExecutor.initialize(); // 手动初始化
return threadPoolTaskExecutor;
}
@Bean
public MyJobFactory jobFactory() {
return new MyJobFactory();
}
}
调度管理器,定义自己需要的功能
import com.wipinfo.central.engine.constant.JobConstant;
import com.wipinfo.central.engine.entity.QuartzTask;
import com.wipinfo.central.engine.enums.JobStatusEnums;
import com.wipinfo.central.engine.enums.JobTypeEnums;
import com.wipinfo.central.engine.service.IQuartzService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
/** * @author echoo华地 * @description 调度管理器 * @date 2022/5/30 13:47 */
@Slf4j
@Component
public class SchedulerManager {
@Resource(name = JobConstant.JOB_EXECUTOR_TIMING)
private Scheduler timingScheduler;
@Resource(name = JobConstant.JOB_EXECUTOR_UPLOAD)
private Scheduler uploadScheduler;
@Resource(name = JobConstant.JOB_EXECUTOR_CALCULATE)
private Scheduler calculateScheduler;
@Autowired
@Qualifier(JobConstant.JOB_EXECUTOR_CLEAN)
private Scheduler cleanScheduler;
@Autowired
private IQuartzService quartzService;
/** * 调度所有任务 */
public void schedulerAllJobs() {
scheduleTimingJobs();
scheduleCalculateJobs();
scheduleUploadJobs();
// scheduleCleanJobs();
}
/** * 调度数据上传任务 */
public void scheduleUploadJobs() {
log.info("启动数据上传任务调度器");
scheduleUploadJobFromDb(uploadScheduler);
}
/** * 调度数据统计任务 */
public void scheduleCalculateJobs() {
log.info("启动数据统计任务调度器");
scheduleCalculateJobFromDb(calculateScheduler);
}
/** * 调度数据清理任务 */
public void scheduleCleanJobs() {
log.info("启动数据清理任务调度器");
scheduleCleanJobFromDb(cleanScheduler);
}
/** * 调度定时开关机任务 */
public void scheduleTimingJobs() {
log.info("启动开关机定时任务调度器");
scheduleTimingJobFromDb(timingScheduler);
}
/** * 调度数据库数据上传任务 */
private void scheduleUploadJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_UPLOAD);
log.info("当前数据上传任务数:{}", tasks.size());
tasks.stream().filter(task ->
task.getStatus() == 0
).forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
log.error("数据上传任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
});
}
/** * 调度数据库数据统计任务 */
private void scheduleCalculateJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_CALCULATE);
log.info("当前数据统计任务数:{}", tasks.size());
tasks.stream().filter(task ->
task.getStatus() == 0
).forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
log.error("数据统计任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
});
}
/** * 调度数据库数据清理任务 */
private void scheduleCleanJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_CLEAN);
log.info("当前数据清理任务数:{}", tasks.size());
tasks.stream().filter(task ->
task.getStatus() == 0
).forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
log.error("数据清理任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
});
}
/** * 调度数据库定时开关机任务 */
private void scheduleTimingJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = Collections.emptyList();
// 查询 定时开关机组 任务
tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_TIMING);
log.info("当前定时开关机任务数: {}", tasks.size());
// 把所有定时任务注入调度器中
tasks.forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
if (e.getMessage().contains("will never fire")) {
// 修改任务状态为已过期
quartzService.updateStatus(task.getId(), JobStatusEnums.ALREADY_EXECUTE.getStatus());
log.warn("定时任务[{}]已过期失效", task.getJobName());
} else {
log.error("定时任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
}
});
}
/** * @param schedulerName 需要重载的调度器名称 * @description 重载指定调度器任务 * @author echoo华地 * @date 2022/5/31 10:00 */
public void rescheduleJobs(String schedulerName) {
log.info("正在重载[{}]调度器任务", schedulerName);
try {
switch (schedulerName) {
case JobConstant.JOB_EXECUTOR_TIMING:
if (clearSchedulerJobs(timingScheduler))
scheduleTimingJobFromDb(timingScheduler);
break;
case JobConstant.JOB_EXECUTOR_UPLOAD:
if (clearSchedulerJobs(uploadScheduler))
scheduleUploadJobFromDb(uploadScheduler);
break;
case JobConstant.JOB_EXECUTOR_CALCULATE:
if (clearSchedulerJobs(calculateScheduler))
scheduleCalculateJobFromDb(calculateScheduler);
break;
case JobConstant.JOB_EXECUTOR_CLEAN:
if (clearSchedulerJobs(calculateScheduler))
scheduleCleanJobFromDb(calculateScheduler);
break;
default:
log.error("重载调度器任务失败;错误[{}调度器不存在]", schedulerName);
}
} catch (Exception e) {
log.error("重载定时任务失败;错误:[{}]", e.getMessage());
}
}
/** * 清空定时开关机任务 */
private boolean clearSchedulerJobs(Scheduler scheduler) {
try {
log.info("清空[{}]调度器所有任务", scheduler.getSchedulerName());
scheduler.clear();
return true;
} catch (SchedulerException e) {
log.error("清空调度器任务异常;错误[{}]", e.getMessage());
return false;
}
}
/** * @param task 单个任务 * @param scheduler 调度器 * @description 调度器添加调度任务 * @author echoo华地 * @date 2022/5/30 17:12 */
private void schedulerJob(QuartzTask task, Scheduler scheduler) throws SchedulerException {
JobKey jobKey = new JobKey(task.getJobName(), task.getJobGroup()); // 任务唯一标识
Integer type = task.getType();
Class<? extends Job> jobClass = getJobClass(type);
if ((jobClass == null) || (!Job.class.isAssignableFrom(jobClass))) {
log.error("调度失败,任务类型错误!");
}
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobKey) // 任务类型,任务数据
.usingJobData(JobConstant.JOB_DATA_JOB_ID, task.getId())
.usingJobData(JobConstant.JOB_DATA_JOB_TYPE, task.getType())
.usingJobData(JobConstant.JOB_DATA_JOB_NAME, task.getJobName()).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
// 以任务名为名称,并根据cron表达式创建触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(task.getJobName(),
task.getJobGroup()).withSchedule(scheduleBuilder).build();
if (scheduler.checkExists(jobKey)) {
// 任务已在调度器内,先删除
scheduler.deleteJob(jobKey);
}
// 加入调度
scheduler.scheduleJob(jobDetail, cronTrigger);
}
private Class<? extends Job> getJobClass(Integer jobType) {
Class<?> jobClass = JobTypeEnums.getJobClassByType(jobType);
if (Job.class.isAssignableFrom(jobClass)) {
return (Class<Job>) jobClass;
} else {
return null;
}
}
}
边栏推荐
- InfoQ geek media's 15th anniversary solicitation 𞓜 migration of Eureka to Nacos: dual registration and dual subscription model
- NFT market has entered the era of aggregation, and okaleido has become the first aggregation platform on BNB chain
- Thirty one - the number of mongodb links in nodejs simple proxy pool (combined) exploded
- BigDecimal使用不当,造成P0事故!
- 太神奇的 SQL 查询经历,group by 慢查询优化!
- Web development exchange, web development example tutorial
- 叶酸配体的金属有机骨架材料MOFs负载5-氟尿嘧啶,西达本胺,紫杉醇,阿霉素,柔红霉素等药物
- 叁拾壹- NodeJS简单代理池(合) 之 MongoDB 链接数爆炸了
- excel条件格式使用详细步骤
- Eight sorting methods (difficulty: heap sort merge sort quick sort)
猜你喜欢

Thirty one - the number of mongodb links in nodejs simple proxy pool (combined) exploded

不止于观测|阿里云可观测技术峰会正式上线

Camtasia 2022最新版新增功能

【mysql进阶】利用执行计划explain优化sql(二)
[email protected]负载5-氟尿嘧啶|[email protected]装载5"/>[email protected]负载5-氟尿嘧啶|[email protected]装载5

MOFs, metal organic framework materials of folate ligands, are loaded with 5-fluorouracil, sidabelamine, taxol, doxorubicin, daunorubicin and other drugs

TemplateDoesNotExist at /users/register/
[email protected] -199 loaded drug ciprofloxacin"/>[email protected] -199 loaded drug ciprofloxacin

论文阅读 (53):Universal Adversarial Perturbations

Thirty seven - JS tried fractal graphics on canvas (I) drew an ordinary box graph
随机推荐
Cyclodextrin metal organic framework loaded low molecular weight heparin and adriamycin (MOF metal organic framework loaded biological macromolecular drugs)
Matlab相关函数知识点(三)-floor函数+点除运算符+矩阵索引规则
开发一个软件应用程序需要多少钱?
深耕十年,玄武云科技终于稳坐快消SaaS龙头宝座
4.【素短语,最左素短语】
[email protected] -199 loaded drug ciprofloxacin
Go zero micro Service Practice Series (II. Service splitting)
BigDecimal使用不当,造成P0事故!
Thirty eight JS tried fractal graphics on canvas (II) tried mountain building, painted mountains and the basis of angular geometry
Qt-Char实现动态波形显示
Camtasia 2022最新版新增功能
叶酸配体的金属有机骨架材料MOFs负载5-氟尿嘧啶,西达本胺,紫杉醇,阿霉素,柔红霉素等药物
InfoQ geek media's 15th anniversary solicitation 𞓜 migration of Eureka to Nacos: dual registration and dual subscription model
You need to think about the following questions about the online help center
Float float simulates double precision computation on CPU and GPU
建造者模式
面试题快速排序 递归和非递归实现
Dotnet core can also coordinate distributed transactions!
MOS管从入门到精通
IDEA插件推荐:文件树增强,显示类注释