当前位置:网站首页>实战:10 种实现延迟任务的方法,附代码!
实战:10 种实现延迟任务的方法,附代码!
2022-08-04 15:10:00 【InfoQ】
什么是延迟任务?
- 红包 24 小时未被查收,需要延迟执退还业务;
- 每个月账单日,需要给用户发送当月的对账单;
- 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。
延迟任务实现思路分析
- 自己手写一个“死循环”一直判断当前时间节点有没有要执行的任务;
- 借助 JDK 或者第三方提供的工具类来实现延迟任务。
延迟任务实现
1.无限循环实现延迟任务
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 延迟任务执行方法汇总
*/
public class DelayTaskExample {
// 存放定时任务
private static Map<String, Long> _TaskMap = new HashMap<>();
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.now());
// 添加定时任务
_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延迟 3s
// 调用无限循环实现延迟任务
loopTask();
}
/**
* 无限循环实现延迟任务
*/
public static void loopTask() {
Long itemLong = 0L;
while (true) {
Iterator it = _TaskMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
itemLong = (Long) entry.getValue();
// 有任务需要执行
if (Instant.now().toEpochMilli() >= itemLong) {
// 延迟任务,业务逻辑执行
System.out.println("执行任务:" + entry.getKey() +
" ,执行时间:" + LocalDateTime.now());
// 删除任务
_TaskMap.remove(entry.getKey());
}
}
}
}
}
2.Java API 实现延迟任务
① ScheduledExecutorService 实现延迟任务
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.now());
scheduledExecutorServiceTask();
}
/**
* ScheduledExecutorService 实现固定频率一直循环执行任务
*/
public static void scheduledExecutorServiceTask() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
// 执行任务的业务代码
System.out.println("执行任务" +
" ,执行时间:" + LocalDateTime.now());
}
},
2, // 初次执行间隔
2, // 2s 执行一次
TimeUnit.SECONDS);
}
}
② DelayQueue 实现延迟任务
public class DelayTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
// 添加延迟任务
delayQueue.put(new DelayElement(1000));
delayQueue.put(new DelayElement(3000));
delayQueue.put(new DelayElement(5000));
System.out.println("开始时间:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 执行延迟任务
System.out.println(delayQueue.take());
}
System.out.println("结束时间:" + DateFormat.getDateTimeInstance().format(new Date()));
}
static class DelayElement implements Delayed {
// 延迟截止时间(单面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 获取剩余时间
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 队列里元素的排序依据
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
}
3.Redis 实现延迟任务
① 通过数据判断的方式
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class DelayQueueExample {
// zset key
private static final String _KEY = "myDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// 延迟 30s 执行(30s 后的时间)
long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
jedis.zadd(_KEY, delayTime, "order_1");
// 继续添加测试数据
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
// 开启延迟队列
doDelayQueue(jedis);
}
/**
* 延迟队列消费
* @param jedis Redis 客户端
*/
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// 当前时间
Instant nowInstant = Instant.now();
long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间
long nowSecond = nowInstant.getEpochSecond();
// 查询当前时间的所有任务
Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
for (String item : data) {
// 消费任务
System.out.println("消费:" + item);
}
// 删除已经执行的任务
jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
Thread.sleep(1000); // 每秒轮询一次
}
}
}
② 通过键空间通知
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "[email protected]__:expired"; // 订阅频道名称
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 执行定时任务
doTask(jedis);
}
/**
* 订阅过期消息,执行定时任务
* @param jedis Redis 客户端
*/
public static void doTask(Jedis jedis) {
// 订阅过期消息
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收到消息,执行定时任务
System.out.println("收到消息:" + message);
}
}, _TOPIC);
}
}
4.Netty 实现延迟任务
<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.48.Final</version>
</dependency>
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.now());
NettyTask();
}
/**
* 基于 Netty 的延迟任务
*/
private static void NettyTask() {
// 创建延迟任务实例
HashedWheelTimer timer = new HashedWheelTimer(3, // 时间间隔
TimeUnit.SECONDS,
100); // 时间轮中的槽数
// 创建一个任务
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("执行任务" +
" ,执行时间:" + LocalDateTime.now());
}
};
// 将任务添加到延迟队列中
timer.newTimeout(task, 0, TimeUnit.SECONDS);
}
}
5.MQ 实现延迟任务
- 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
- 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能。
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定队列到交换器
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000);
return message;
}
});
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
6.使用 Spring 定时任务
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
public class ScheduleJobs {
@Scheduled(fixedDelay = 2 * 1000)
public void fixedDelayJob() throws InterruptedException {
System.out.println("任务执行,时间:" + LocalDateTime.now());
}
}
7.Quartz 实现延迟任务
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
public class SampleJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext)
throws JobExecutionException {
System.out.println("任务执行,时间:" + LocalDateTime.now());
}
}
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SampleScheduler {
@Bean
public JobDetail sampleJobDetail() {
return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
.storeDurably().build();
}
@Bean
public Trigger sampleJobTrigger() {
// 3s 后执行
SimpleScheduleBuilder scheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger")
.withSchedule(scheduleBuilder).build();
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/**
* SpringBoot 项目启动后执行
*/
public class MyStartupRunner implements CommandLineRunner {
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private SampleScheduler sampleScheduler;
@Override
public void run(String... args) throws Exception {
// 启动定时任务
schedulerFactoryBean.getScheduler().scheduleJob(
sampleScheduler.sampleJobTrigger());
}
}
总结
- 手动无线循环;
- ScheduledExecutorService;
- DelayQueue;
- Redis zset 数据判断的方式;
- Redis 键空间通知的方式;
- Netty 提供的 HashedWheelTimer 工具类;
- RabbitMQ 死信队列;
- RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange;
- Spring Scheduled;
- Quartz。
边栏推荐
- Hangzhou electric the competition team arrangement (ACM)
- 宣传海报
- 杭电校赛(ACM组队安排)
- 特殊品种的二次开户验资金额
- Technology sharing | Description of the electronic fence function in the integrated dispatching system
- leetcode:254. 因子的组合
- 365天挑战LeetCode1000题——Day 049 非递增顺序的最小子序列 贪心
- leetcode:212. 单词搜索 II
- Google plug-in. Download contents file is automatically deleted after solution
- leetcode: 241. Designing precedence for arithmetic expressions
猜你喜欢
随机推荐
leetcode: 254. Combinations of factors
Legal education combined with VR panorama, intuitively feel and learn the spirit of the rule of law
Resharper 如何把类里的类移动到其他文件
洛谷题解P4326 求圆的面积
Roslyn 通过 nuget 统一管理信息
从-99打造Sentinel高可用集群限流中间件
Unity AR阴影投射透明地面 仅渲染模型实时阴影 Shader实现
QT笔记——Q_INVOKABLE了解
游戏网络 UDP+FEC+KCP
leetcode:253. 至少需要多少间会议室
HarePoint Analytics for SharePoint Online
Game network UDP + FEC + KCP
小程序|炎炎夏日、清爽一夏、头像大换装
This week to discuss the user experience: Daedalus Nemo to join Ambire, explore the encryption of the ocean
leetcode:251. 展开二维向量
李沐的深度学习笔记来了!
FRED应用:毛细管电泳系统
杭电校赛(ACM组队安排)
Bluetooth Technology|In the first half of the year, 1.3 million charging piles were added nationwide, and Bluetooth charging piles will become the mainstream of the market
技术分享| 小程序实现音视频通话