当前位置:网站首页>Redis(十二) - Redis消息队列
Redis(十二) - Redis消息队列
2022-08-02 05:13:00 【Super_Leng】
文章目录
一、Redis消息队列
由上一章内容可知,基于阻塞队列的异步秒杀还存在2个问题:
- 内存限制问题(如果不对BlockingQueue做大小限制,则会有内存溢出问题)
- 数据安全问题(如果服务宕机,则内存的数据将会丢失)
这一章通过Redis的消息队列进行优化
1. 消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
2. 基于List结构模拟消息队列
- Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
- 队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
- 注意,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
基于List的消息队列有哪些优缺点?
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失(取到消息还没处理并且出现异常,则消息会丢失)
- 只支持单消费者(一个线程取到消息后,list中将移除该消息,其他线程则获取不到)
3. 基于PubSub的消息队列
PubSub(发布订阅) 是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel [channel] :订阅一个或多个频道
- PUBLISH channel msg :向一个频道发送消息
- PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列有哪些优缺点?
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化(与list不同的时,list本身是数据结构可以存储数据,而PubSub只支持消息发送)
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失(消费者来不及处理的消息,会存到消费者的缓冲区,缓冲区是有大小限制的)
4. 基于Stream的消息队列 - 单消费模式
- Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:
例如:
## 创建名为 users 的队列,并向其中发送一个消息,内容是:{
name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯(消息不丢失,读取后消息仍然存在队列中)
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
4. 基于Stream的消息队列 - 消费者组
- 由于单消费模式会出现消息漏读的情况,所以出现了消费者组
- 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组:
# key:队列名称
# groupName:消费者组名称
# ID:起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
# MKSTREAM:队列不存在时自动创建队列
XGROUP CREATE key groupName ID [MKSTREAM]
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
# group:消费组名称
# consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# count:本次查询的最大数量
# BLOCK milliseconds:当没有消息时最长等待时间
# NOACK:无需手动ACK,获取到消息后自动确认,自动ACK可能会出现消息丢失,所以一般需要手动ACK
# STREAMS key:指定队列名称
# ID:获取消息的起始ID:
">":从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始,消费完并确认后,则会从pending-list中移除
正常情况通过">"读取消息,出现异常情况后,再从pending-list中读取
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
队列中有4个消息k1、k2、k3、k4
- 创建队列s1,消费者组g1
- 消费者组g1中的消费者c1先读2个消息k1、k2
- 消费者组g1中的消费者c1再读2个消息k3、k4
消息确认命令:
127.0.0.1:6379> help XACK
XACK key group ID [ID ...]
summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.
since: 5.0.0
group: stream
一次性确认消费者组g1中的消息:
127.0.0.1:6379> XACK s1 g1 1646339018049-0 1646339342815-0 1646339529899-0 1646339537593-0
(integer)4
127.0.0.1:6379>
从pending-list中读取消息的命令:
127.0.0.1:6379> help XPENDING
# [IDLE min-idle-time]表示 获取消息以后 到 确认消息之前 的这段空闲时间
# start end 表示从start 到 end 之间的id 消息
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.
since: 5.0.0
group: stream
- “- +” 表示从最小到最大id之间的消息,10表示消息数量
- 4)(integer)1 表示pending-list中有1个消息未被确认
消费者监听消息的基本思路:
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
二、基于Redis的Stream结构作为消息队列,实现异步秒杀下单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
先提前创建stream.orders队列:
127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
OK
127.0.0.1:6379>
优化后的Lua脚本:
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key,2个。。表示拼接字符串
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
-- tonumber是将字符串转为数字
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- 'id', orderId,用id对应orderId,因为VoucherOrder实体类中是用id表示的orderId,这里也用id表示的orderId,解析Redis对象后可以少一次转换
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
优化后的代码:
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
// 创建阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
// 用线程池创建独立线程
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 将代理对象作为全局变量,供所有线程使用
private IVoucherOrderService proxy;
// 项目启动后,就应该开启线程,异步从阻塞队列中获取信息
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
private final String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 0.初始化stream
initStream();
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
// GROUP g1 c1
Consumer.from("g1", "c1"),
// BLOCK 2000
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
// STREAMS stream.orders >
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
// 将Redis对象转为VoucherOrder对象
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 4.读取消息成功后,创建订单
handleVoucherOrder(voucherOrder);
// 5.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
// 出现异常后,从PendingList中读取消息
handlePendingList();
}
}
}
public void initStream(){
Boolean exists = stringRedisTemplate.hasKey(queueName);
if (BooleanUtil.isFalse(exists)) {
log.info("stream不存在,开始创建stream");
// 不存在,需要创建
stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
log.info("stream和group创建完毕");
return;
}
// stream存在,判断group是否存在
StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName);
if(groups.isEmpty()){
log.info("group不存在,开始创建group");
// group不存在,创建group
stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
log.info("group创建完毕");
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取PendingList中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1), // 不需要BLOCK
StreamOffset.create(queueName, ReadOffset.from("0")) // 从PendingList中0开始读取
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明PendingList没有消息,这里是结束循环,不再继续
break;
}
// 3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
// 将Redis对象转为VoucherOrder对象
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 4.读取消息成功后,创建订单
handleVoucherOrder(voucherOrder);
// 5.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理PendingList中的订单异常", e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 1.获取用户,注意,这里是单独的线程,所以不能从主线程的ThreadLocal获取userId
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁,这里加锁是兜底方案,可以不用再加锁,因为前面执行过lua脚本校验过一人一单
boolean isLock = redisLock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}
try {
// 注意:不能通过 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); 获取代理对象
// 因为AopContext.currentProxy();底层内部也有个ThreadLocal,但是此时的线程是新开启的线程,所以不能获取不到主线程中的代理对象
// 所以需要在主线程中先获取到代理对象,保存到全局变量供所有线程使用
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);
}
@Transactional
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过了");
return;
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足");
return;
}
// 7.创建订单
save(voucherOrder);
}
}
测试结果:
- 平均耗时和吞吐量有了明显提升
边栏推荐
- Point Density-Aware Voxels for LiDAR 3D Object Detection 论文笔记
- 虚拟现实房产展示系统提前预见未来装修效果
- 21天学习挑战赛安排
- MySQL implements sorting according to custom (specified order)
- 提高软件测试能力的方法有哪些?看完这篇文章让你提升一个档次
- What do interview test engineers usually ask?The test supervisor tells you
- 5款经典代码阅读器的使用方案对比
- 51单片机外设篇:ADC
- 无代码生产新模式探索
- Introduction to coredns
猜你喜欢
About the directory structure of the web application
[C language] LeetCode26. Delete duplicates in an ordered array && LeetCode88. Merge two ordered arrays
Redis集群模式
The company does not pay attention to software testing, and the new Ali P8 has written a test case writing specification for us
Block elements, inline elements (
elements, span elements)【漫画】2021满分程序员行为对照表(最新版)
51 microcontroller peripherals article: dot-matrix LCD
MySQL implements sorting according to custom (specified order)
复盘:图像饱和度计算公式和图像信噪(PSNR)比计算公式
跨桌面端Web容器演进
随机推荐
PSQL function, predicate, CASE expression and set operations
[PSQL] 函数、谓词、CASE表达式、集合运算
软件测试的需求人才越来越多,为什么大家还是不太愿意走软件测试的道路?
Introduction to Grid Layout
C语言中i++和++i在循环中的差异性
Point Density-Aware Voxels for LiDAR 3D Object Detection 论文笔记
保证家里和企业中的WIFI安全-附AC与AP组网实验
ATM系统
navicat connects to MySQL and reports an error: 1045 - Access denied for user 'root'@'localhost' (using password YES)
Redis database
nacos registry
Packaging and deployment of go projects
Meta公司内部项目-RaptorX:将Presto性能提升10倍
为什么4个字节的float要比8个字节的long大呢?
Introduction and use of apifox (1).
[C language] LeetCode26. Delete duplicates in an ordered array && LeetCode88. Merge two ordered arrays
Navicat报错:1045 -拒绝访问用户[email protected](使用passwordYES)
Three methods of importing sql files in MySQL
C 竞赛——捕鱼
使用TinkerPop框架对GDB增删改查