当前位置:网站首页>Don't use redis list to implement message queue. Stream is designed for queues
Don't use redis list to implement message queue. Stream is designed for queues
2022-06-11 09:23:00 【CSDN cloud computing】

author | Code byte
source | Code byte
Use Redis Of List Implementing message queuing has many limitations , such as :
There is no good ACK Mechanism ;
No, ConsumerGroup Consumer group concept ;
A pile of news .
List It's a linear structure , To query the specified data, you need to traverse the whole list ;
Stream yes Redis 5.0 A data type specially designed for message queue is introduced ,Stream Is a containing 0 An ordered queue of one or more elements , These elements are based on ID In order of size .
It implements most of the functions of message queuing :
news ID Serialization generation ;
Message traversal ;
Blocking and non blocking reads of messages ;
Consumer Groups Consumer groups ;
ACK Acknowledgement mechanism .
Support multicast .
Provides many message queue operation commands , And learn from Kafka Of Consumer Groups The concept of , Provides the function of consumption group .
At the same time, it provides message persistence and master-slave replication mechanism , The client can access data at any time , And remember the location of each client's access , So as to ensure that the message is not lost .
Don't talk nonsense , Let's see how to use , See the official website for details :https://redis.io/topics/streams-intro
XADD: Insert message
When the last word of Yunshan falls , The tension that pervaded , It was suddenly broken , The wings of the cloud and lanzong elders fluttered behind them , It's a whizzing across the sky , Chase Xiao Yan .
Yunshan uses the following command to insert... Into the queue 「 Chase Xiao Yan 」 command , Let the elder lead his children to carry out .
XADD Yunlanzong * task kill name Xiaoyan
"1645936602161-0"Stream Each element in consists of a key value pair , No The same element can contain different numbers of key value pairs .
The syntax of the command is as follows :
XADD streamName id field value [field value ...]After the message queue name 「*」 , Let go Redis Automatically generate unique for inserted messages ID, Of course, you can define it yourself .
news ID It's made up of two parts :
Timestamp in the current millisecond ;
Sequence number . from 0 As the starting value , Used to distinguish multiple commands generated at the same time .
*
By putting elements ID Relate to time , And force the of new elements ID Must be greater than the of the old element ID, Redis Logically, it becomes a kind of operation that only performs append (append only) Data structure of .
This feature is very important for users who use streams to implement message queues and event systems :
Users can be sure that , New messages and events will only appear after existing messages and events , Just like in the real world, new events always happen after existing events , Everything is in order .
XREAD: Read message
Yunling old dog uses the following command to receive Yunshan's command :
XREAD COUNT 1 BLOCK 0 STREAMS Yunlanzong 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
2) 1) 1) "1645936602161-0"
2) 1) "task"
2) "kill"
3) "name"
4) " Xiaoyan " # Xiaoyan XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
This instruction can read multiple streams at the same time , The corresponding meanings of each mental method are as follows :
COUNT: Indicates the maximum number of elements read in each stream ;
BLOCK: Block read , When there is no message in the message queue , Then block and wait , 0 It means infinite waiting , In milliseconds .
ID: news ID, When reading messages, you can specify ID, And from this ID The next message starts reading ,0-0 It means reading from the first element .
If you want to use XREAD Sequential consumption , Remember the returned message after each reading ID, Next call XREAD The last returned message ID Pass it as a parameter to the next call, and you can continue to consume subsequent messages .
*
Lord yunyun , I just arrived at yunlanzong today , The news of history will not be answered , Just want to accept my use XREAD The moment of blocking waiting begins to pass XADD How to adjust the news released ?
function 「」 Mental skill is enough , The last of the mental method 「」 The symbol indicates reading the latest blocking message , If you can't read it, you'll wait forever .
In the process of waiting , Other elders append messages to the queue , Will immediately read .
XREAD COUNT 1 BLOCK 0 STREAMS Yunlanzong $*
Is it so easy to implement message queuing ? Well said ACK Mechanism? ?
It's just an appetizer , adopt XREAD The read data is not actually deleted , When reexecuting XREAD COUNT 2 BLOCK 0 STREAMS Yunlanzong 0-0 The command will be read again .
So we still need ACK Mechanism ,
Next , Let's have a real message queue .
ConsumerGroup
Redis Stream Of ConsumerGroup( Consumer group ) Allows users to logically divide a flow into multiple different flows , And let ConsumerGroup Consumers to deal with .
It's a powerful Persistent message queue supporting multicast .Redis Stream Learn from it Kafka The design of the .
Stream High availability is based on master-slave replication , It is no different from the replication mechanism of other data structures , That is to say Sentinel and Cluster In a cluster environment Stream It can support high availability .

Redis Stream Its structure is shown in the figure above . There is a message linked list , Every message has a unique ID And the corresponding content ;
Message persistence ;
The status of each consumer group is independent , No, it doesn't affect , The same Stream Messages will be consumed by all consumer groups ;
A consumer group can be composed of multiple consumers , There is a competitive relationship between consumers , Any consumer reading the message will make last_deliverd_id Move forward ;
Every consumer has one pending_ids Variable , Used to record that the current consumer has read but has not yet ack The news of . It is used to ensure that the message is consumed by the client at least once .
The message queue implemented by the consumer group mainly involves the following three instructions :
XGROUP Used to create 、 Destroy and manage consumer groups .
XREADGROUP Read data from the stream through the consumption group .
XACK Is a command that allows the consumer to mark the pending message as correctly processed .
Create a consumption group
Stream adopt XGROUP CREATE Instruction create consumption group (Consumer Group), Start message needs to be delivered ID Parameters are used to initialize last_delivered_id Variable .
We use XADD Go to bossStream Insert some messages into the queue :
XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40The following instructions , The message queue is named bossStream establish 「 Qinglongmen 」 and 「 Six doors 」 Two consumer groups .
# The grammar is as follows
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream Qinglongmen 0-0 MKSTREAM
XGROUP CREATE bossStream Six doors 0-0 MKSTREAMstream: Specify the name of the queue ;
group: Specify the name of the consumer group ;
start_id: Specify the consumption group in Stream The beginning of ID, It determines from which consumer group ID Then start reading messages ,
0-0Start reading from the first ,$Indicates that the reading starts from the last one , Only receive new messages .MKSTREAM: By default ,XGROUP CREATE The command returns an error when the target stream does not exist . Optional... Can be used
MKSTREAMSubcommand as After the last parameter to automatically create the flow .
Read message
Give Way 「 Qinglongmen 」 Consumer group consumer1 from bossStream Block to read a message :
XREADGROUP GROUP Qinglongmen consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957821396-0"
2) 1) "name"
2) "zhangsan"
3) "age"
4) "26"The grammar is as follows :
XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...][] The in represents optional parameters , The command and XREAD Be the same in essentials while differing in minor points , The difference is that new GROUP groupName consumerName Options .
The two parameters of this option are used to specify the consumer group to be read and the consumer responsible for processing the message .
among :
>: The last argument of the command>, Indicates to start reading from a message that has not been consumed ;BLOCK: Block read ;
Knock on the blackboard
If a message in the message queue is consumed by a consumer in the consumer group , This message will no longer be read by other consumers in this consumer group .
such as consumer2 Perform read operations :
XREADGROUP GROUP Qinglongmen consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957838700-0"
2) 1) "name"
2) "lisi"
3) "age"
4) "2"consumer2 Can no longer read zhangsan 了 , Instead, read the next lisi Because the news has been consumer1 Read out .
Another purpose of using consumers is to enable multiple consumers in the group to share reading messages , That is, each consumer reads some messages , So as to realize load balancing .
For example, a consumer group has three consumers C1、C2、C3 And a message containing 1、2、3、4、5、6、7 The flow of :

XPENDING View read unconfirmed messages
In order to ensure that consumers can still read messages after failure or downtime during consumption ,Stream There's a queue inside (pending List) Save each consumer read but not yet executed ACK The news of .
If consumers use XREADGROUP GROUP groupName consumerName Read message , But not to Stream send out XACK command , The message remains .
For example. bossStream Medium Consumer groups 「 Qinglongmen 」 Each consumer in has read the unconfirmed message information :
XPENDING bossStream Qinglongmen
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"1)Number of unacknowledged messages ;2) ~ 3)The messages read by all consumers in qinglongmen are the smallest and the largest ID;
see consumer1 What data was read , Use the following command :
XPENDING bossStream Qinglongmen - + 10 consumer1
1) 1) "1645957821396-0"
2) "consumer1"
3) (integer) 3758384
4) (integer) 1ACK confirm
So when the message is received and the consumption is successful , We need to manually ACK notice Streams, This message will be deleted . The order is as follows :
XACK bossStream Qinglongmen 1645957821396-0 1645957838700-0
(integer) 2The grammar is as follows :
XACK key group-key ID [ID ...]
Consumer confirmation increases the reliability of messages , Generally, after the completion of business processing , You need to perform ack Confirm that the message has been consumed , The execution of the whole process is shown in the figure below :

Use Redisson actual combat
Use maven Add dependency
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>add to Redis To configure , Code brother's Redis No password configured , You can configure it according to the actual situation .
spring:
application:
name: redission
redis:
host: 127.0.0.1
port: 6379
ssl: false@Slf4j
@Service
public class QueueService {
@Autowired
private RedissonClient redissonClient;
/**
* Send message to queue
*
* @param message
*/
public void sendMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");
}
/**
* Consumer consumption news
*
* @param message
*/
public void consumerMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.createGroup("sensors_data", StreamMessageId.ALL);
Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
Map<String, String> msg = entry.getValue();
System.out.println(msg);
stream.ack("sensors_data", entry.getKey());
}
}
}
Previous recommendation
from 40% Fell to 4%,“ paste ” the Firefox Can you return to the top ?
Gartner Release 2022 Five major technological trends in the automotive industry in
Use this library , Let your service operate Redis Speed up
comic : What is? “ Low code ” Development platform ?

Share

Point collection

A little bit of praise

Click to see
边栏推荐
- 1854. the most populous year
- [ERP system] how much do you know about the professional and technical evaluation?
- 实现边充边OTG的PD芯片GA670-10
- 机器学习笔记 - 卷积神经网络备忘清单
- Remote office related issues to be considered by enterprises
- 682. baseball game
- Importance of implementation of clothing ERP in the project
- How win10 Home Edition connects to remote desktop
- Openstack explanation (24) -- registration of neutron service
- Opencv CEO teaches you to use oak (V): anti deception face recognition system based on oak-d and depthai
猜你喜欢

Method (common method), method execution memory analysis, method overloading mechanism, method recursion

Comparison and introduction of OpenCV oak cameras

Blinn Phong reflection model

Talk about reading the source code

Output image is bigger (1228800b) than maximum frame size specified in properties (1048576b)

Shandong University project training (IV) -- wechat applet scans web QR code to realize web login

Openstack explanation (XXIII) -- other configurations, database initialization and service startup of neutron

Identifier keyword literal data type base conversion character encoding variable data type explanation operator

Résumé de la méthode d'examen des mathématiques

OpenCV OAK-D-W广角相机测试
随机推荐
Type-C docking station adaptive power supply patent protection case
openstack详解(二十三)——Neutron其他配置、数据库初始化与服务启动
What are the types of garment ERP system in the market?
移动端页面使用rem来做适配
企业需要考虑的远程办公相关问题
Augmented reality experiment IV of Shandong University
【芯片方案】红外人体测温仪方案设计
DOS command virtual environment
【ROS】noedic-moveit安装与UR5模型导入
Modularnotfounderror: no module named 'find_ version’
ERP体系能帮忙企业处理哪些难题?
206. reverse linked list
Pytorch installation for getting started with deep learning
Use of MSF evaluation module
CUMT learning diary - theoretical analysis of uCOSII - Textbook of Renzhe Edition
2161. 根据给定数字划分数组
Machine learning notes - in depth Learning Skills Checklist
Why is it difficult to implement informatization in manufacturing industry?
Machine learning notes - convolutional neural network memo list
[C language - Advanced pointer] mining deeper knowledge of pointer