当前位置:网站首页>Don't use redis list to implement message queue. Stream is designed for queues
Don't use redis list to implement message queue. Stream is designed for queues
2022-06-11 09:23:00 【CSDN cloud computing】

author | Code byte
source | Code byte
Use Redis Of List Implementing message queuing has many limitations , such as :
There is no good ACK Mechanism ;
No, ConsumerGroup Consumer group concept ;
A pile of news .
List It's a linear structure , To query the specified data, you need to traverse the whole list ;
Stream yes Redis 5.0 A data type specially designed for message queue is introduced ,Stream Is a containing 0 An ordered queue of one or more elements , These elements are based on ID In order of size .
It implements most of the functions of message queuing :
news ID Serialization generation ;
Message traversal ;
Blocking and non blocking reads of messages ;
Consumer Groups Consumer groups ;
ACK Acknowledgement mechanism .
Support multicast .
Provides many message queue operation commands , And learn from Kafka Of Consumer Groups The concept of , Provides the function of consumption group .
At the same time, it provides message persistence and master-slave replication mechanism , The client can access data at any time , And remember the location of each client's access , So as to ensure that the message is not lost .
Don't talk nonsense , Let's see how to use , See the official website for details :https://redis.io/topics/streams-intro
XADD: Insert message
When the last word of Yunshan falls , The tension that pervaded , It was suddenly broken , The wings of the cloud and lanzong elders fluttered behind them , It's a whizzing across the sky , Chase Xiao Yan .
Yunshan uses the following command to insert... Into the queue 「 Chase Xiao Yan 」 command , Let the elder lead his children to carry out .
XADD Yunlanzong * task kill name Xiaoyan
"1645936602161-0"Stream Each element in consists of a key value pair , No The same element can contain different numbers of key value pairs .
The syntax of the command is as follows :
XADD streamName id field value [field value ...]After the message queue name 「*」 , Let go Redis Automatically generate unique for inserted messages ID, Of course, you can define it yourself .
news ID It's made up of two parts :
Timestamp in the current millisecond ;
Sequence number . from 0 As the starting value , Used to distinguish multiple commands generated at the same time .
*
By putting elements ID Relate to time , And force the of new elements ID Must be greater than the of the old element ID, Redis Logically, it becomes a kind of operation that only performs append (append only) Data structure of .
This feature is very important for users who use streams to implement message queues and event systems :
Users can be sure that , New messages and events will only appear after existing messages and events , Just like in the real world, new events always happen after existing events , Everything is in order .
XREAD: Read message
Yunling old dog uses the following command to receive Yunshan's command :
XREAD COUNT 1 BLOCK 0 STREAMS Yunlanzong 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
2) 1) 1) "1645936602161-0"
2) 1) "task"
2) "kill"
3) "name"
4) " Xiaoyan " # Xiaoyan XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
This instruction can read multiple streams at the same time , The corresponding meanings of each mental method are as follows :
COUNT: Indicates the maximum number of elements read in each stream ;
BLOCK: Block read , When there is no message in the message queue , Then block and wait , 0 It means infinite waiting , In milliseconds .
ID: news ID, When reading messages, you can specify ID, And from this ID The next message starts reading ,0-0 It means reading from the first element .
If you want to use XREAD Sequential consumption , Remember the returned message after each reading ID, Next call XREAD The last returned message ID Pass it as a parameter to the next call, and you can continue to consume subsequent messages .
*
Lord yunyun , I just arrived at yunlanzong today , The news of history will not be answered , Just want to accept my use XREAD The moment of blocking waiting begins to pass XADD How to adjust the news released ?
function 「」 Mental skill is enough , The last of the mental method 「」 The symbol indicates reading the latest blocking message , If you can't read it, you'll wait forever .
In the process of waiting , Other elders append messages to the queue , Will immediately read .
XREAD COUNT 1 BLOCK 0 STREAMS Yunlanzong $*
Is it so easy to implement message queuing ? Well said ACK Mechanism? ?
It's just an appetizer , adopt XREAD The read data is not actually deleted , When reexecuting XREAD COUNT 2 BLOCK 0 STREAMS Yunlanzong 0-0 The command will be read again .
So we still need ACK Mechanism ,
Next , Let's have a real message queue .
ConsumerGroup
Redis Stream Of ConsumerGroup( Consumer group ) Allows users to logically divide a flow into multiple different flows , And let ConsumerGroup Consumers to deal with .
It's a powerful Persistent message queue supporting multicast .Redis Stream Learn from it Kafka The design of the .
Stream High availability is based on master-slave replication , It is no different from the replication mechanism of other data structures , That is to say Sentinel and Cluster In a cluster environment Stream It can support high availability .

Redis Stream Its structure is shown in the figure above . There is a message linked list , Every message has a unique ID And the corresponding content ;
Message persistence ;
The status of each consumer group is independent , No, it doesn't affect , The same Stream Messages will be consumed by all consumer groups ;
A consumer group can be composed of multiple consumers , There is a competitive relationship between consumers , Any consumer reading the message will make last_deliverd_id Move forward ;
Every consumer has one pending_ids Variable , Used to record that the current consumer has read but has not yet ack The news of . It is used to ensure that the message is consumed by the client at least once .
The message queue implemented by the consumer group mainly involves the following three instructions :
XGROUP Used to create 、 Destroy and manage consumer groups .
XREADGROUP Read data from the stream through the consumption group .
XACK Is a command that allows the consumer to mark the pending message as correctly processed .
Create a consumption group
Stream adopt XGROUP CREATE Instruction create consumption group (Consumer Group), Start message needs to be delivered ID Parameters are used to initialize last_delivered_id Variable .
We use XADD Go to bossStream Insert some messages into the queue :
XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40The following instructions , The message queue is named bossStream establish 「 Qinglongmen 」 and 「 Six doors 」 Two consumer groups .
# The grammar is as follows
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream Qinglongmen 0-0 MKSTREAM
XGROUP CREATE bossStream Six doors 0-0 MKSTREAMstream: Specify the name of the queue ;
group: Specify the name of the consumer group ;
start_id: Specify the consumption group in Stream The beginning of ID, It determines from which consumer group ID Then start reading messages ,
0-0Start reading from the first ,$Indicates that the reading starts from the last one , Only receive new messages .MKSTREAM: By default ,XGROUP CREATE The command returns an error when the target stream does not exist . Optional... Can be used
MKSTREAMSubcommand as After the last parameter to automatically create the flow .
Read message
Give Way 「 Qinglongmen 」 Consumer group consumer1 from bossStream Block to read a message :
XREADGROUP GROUP Qinglongmen consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957821396-0"
2) 1) "name"
2) "zhangsan"
3) "age"
4) "26"The grammar is as follows :
XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...][] The in represents optional parameters , The command and XREAD Be the same in essentials while differing in minor points , The difference is that new GROUP groupName consumerName Options .
The two parameters of this option are used to specify the consumer group to be read and the consumer responsible for processing the message .
among :
>: The last argument of the command>, Indicates to start reading from a message that has not been consumed ;BLOCK: Block read ;
Knock on the blackboard
If a message in the message queue is consumed by a consumer in the consumer group , This message will no longer be read by other consumers in this consumer group .
such as consumer2 Perform read operations :
XREADGROUP GROUP Qinglongmen consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957838700-0"
2) 1) "name"
2) "lisi"
3) "age"
4) "2"consumer2 Can no longer read zhangsan 了 , Instead, read the next lisi Because the news has been consumer1 Read out .
Another purpose of using consumers is to enable multiple consumers in the group to share reading messages , That is, each consumer reads some messages , So as to realize load balancing .
For example, a consumer group has three consumers C1、C2、C3 And a message containing 1、2、3、4、5、6、7 The flow of :

XPENDING View read unconfirmed messages
In order to ensure that consumers can still read messages after failure or downtime during consumption ,Stream There's a queue inside (pending List) Save each consumer read but not yet executed ACK The news of .
If consumers use XREADGROUP GROUP groupName consumerName Read message , But not to Stream send out XACK command , The message remains .
For example. bossStream Medium Consumer groups 「 Qinglongmen 」 Each consumer in has read the unconfirmed message information :
XPENDING bossStream Qinglongmen
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"1)Number of unacknowledged messages ;2) ~ 3)The messages read by all consumers in qinglongmen are the smallest and the largest ID;
see consumer1 What data was read , Use the following command :
XPENDING bossStream Qinglongmen - + 10 consumer1
1) 1) "1645957821396-0"
2) "consumer1"
3) (integer) 3758384
4) (integer) 1ACK confirm
So when the message is received and the consumption is successful , We need to manually ACK notice Streams, This message will be deleted . The order is as follows :
XACK bossStream Qinglongmen 1645957821396-0 1645957838700-0
(integer) 2The grammar is as follows :
XACK key group-key ID [ID ...]
Consumer confirmation increases the reliability of messages , Generally, after the completion of business processing , You need to perform ack Confirm that the message has been consumed , The execution of the whole process is shown in the figure below :

Use Redisson actual combat
Use maven Add dependency
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>add to Redis To configure , Code brother's Redis No password configured , You can configure it according to the actual situation .
spring:
application:
name: redission
redis:
host: 127.0.0.1
port: 6379
ssl: false@Slf4j
@Service
public class QueueService {
@Autowired
private RedissonClient redissonClient;
/**
* Send message to queue
*
* @param message
*/
public void sendMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");
}
/**
* Consumer consumption news
*
* @param message
*/
public void consumerMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.createGroup("sensors_data", StreamMessageId.ALL);
Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
Map<String, String> msg = entry.getValue();
System.out.println(msg);
stream.ack("sensors_data", entry.getKey());
}
}
}
Previous recommendation
from 40% Fell to 4%,“ paste ” the Firefox Can you return to the top ?
Gartner Release 2022 Five major technological trends in the automotive industry in
Use this library , Let your service operate Redis Speed up
comic : What is? “ Low code ” Development platform ?

Share

Point collection

A little bit of praise

Click to see
边栏推荐
- Development of PCBA circuit board for small oxygen generator
- Blinn Phong reflection model
- Interview question 17.10 Main elements
- Why is it difficult to implement informatization in manufacturing industry?
- How win10 Home Edition connects to remote desktop
- Shandong University project training (IV) -- wechat applet scans web QR code to realize web login
- 机器学习笔记 - 使用TensorFlow的Spatial Transformer网络
- Sword finger offer II 041 Average value of sliding window
- [software] ERP model selection method for large enterprises
- MSF给正常程序添加后门
猜你喜欢

Thread theory

Type-C Bluetooth speaker single port rechargeable OTG solution

机器学习笔记 - 卷积神经网络备忘清单

kubelet Error getting node 问题求助

Openstack explanation (21) -- installation and configuration of neutron components

Typescript -- preliminary study of variable declaration

Bowen dry goods | Apache inlong uses Apache pulsar to create data warehousing

Console you don't know

Concurrent programming

openstack详解(二十三)——Neutron其他配置、数据库初始化与服务启动
随机推荐
【分享】企业如何进行施行规划?
山东大学项目实训(四)—— 微信小程序扫描web端二维码实现web端登录
Install jupyter in the specified environment
The mobile terminal page uses REM for adaptation
Day39 process object and other method mutexes
报错RuntimeError: BlobReader error: The version of imported blob doesn‘t match graph_transformer
Résumé de la méthode d'examen des mathématiques
Use of MSF evaluation module
MSF evasion模块的使用
从企业评价的方历来看ERP软件成功与失利
MSF基于SMB的信息收集
报错[error] input tesnor exceeds available data range [NeuralNetwork(3)] [error] Input tensor ‘0‘ (0)
机器学习笔记 - 卷积神经网络备忘清单
Redis source code analysis hash object (z\u hash)
企业决议时,哪个部分应该主导ERP项目?
affair
MySQL startup error "bind on tcp/ip port: address already in use"
1854. 人口最多的年份
【服装ERP】施行在项目中的重要性
Type-C扩展坞自适应供电专利维权案例