当前位置:网站首页>Don't use redis list to implement message queue. Stream is designed for queues

Don't use redis list to implement message queue. Stream is designed for queues

2022-06-11 09:23:00 CSDN cloud computing

c73eb23fbcd1be6406bcf5f95fe801e3.gif

author | Code byte

source | Code byte

Use Redis Of List Implementing message queuing has many limitations , such as :

  • There is no good ACK Mechanism ;

  • No, ConsumerGroup Consumer group concept ;

  • A pile of news .

  • List It's a linear structure , To query the specified data, you need to traverse the whole list ;

Stream yes Redis 5.0 A data type specially designed for message queue is introduced ,Stream Is a containing 0 An ordered queue of one or more elements , These elements are based on ID In order of size .

It implements most of the functions of message queuing :

  • news ID Serialization generation ;

  • Message traversal ;

  • Blocking and non blocking reads of messages ;

  • Consumer Groups Consumer groups ;

  • ACK Acknowledgement mechanism .

  • Support multicast .

Provides many message queue operation commands , And learn from Kafka Of Consumer Groups The concept of , Provides the function of consumption group .

At the same time, it provides message persistence and master-slave replication mechanism , The client can access data at any time , And remember the location of each client's access , So as to ensure that the message is not lost .

Don't talk nonsense , Let's see how to use , See the official website for details :https://redis.io/topics/streams-intro

XADD: Insert message

When the last word of Yunshan falls , The tension that pervaded , It was suddenly broken , The wings of the cloud and lanzong elders fluttered behind them , It's a whizzing across the sky , Chase Xiao Yan .

Yunshan uses the following command to insert... Into the queue 「 Chase Xiao Yan 」 command , Let the elder lead his children to carry out .

XADD  Yunlanzong  * task kill name  Xiaoyan 
"1645936602161-0"

Stream Each element in consists of a key value pair , No The same element can contain different numbers of key value pairs .

The syntax of the command is as follows :

XADD streamName id field value [field value ...]

After the message queue name 「*」 , Let go Redis Automatically generate unique for inserted messages ID, Of course, you can define it yourself .

news ID It's made up of two parts :

  • Timestamp in the current millisecond ;

  • Sequence number . from 0 As the starting value , Used to distinguish multiple commands generated at the same time .

*

By putting elements ID Relate to time , And force the of new elements ID Must be greater than the of the old element ID, Redis Logically, it becomes a kind of operation that only performs append (append only) Data structure of .

This feature is very important for users who use streams to implement message queues and event systems :

Users can be sure that , New messages and events will only appear after existing messages and events , Just like in the real world, new events always happen after existing events , Everything is in order .

XREAD: Read message

Yunling old dog uses the following command to receive Yunshan's command :

XREAD COUNT 1 BLOCK 0 STREAMS  Yunlanzong  0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
   2) 1) 1) "1645936602161-0"
         2) 1) "task"
            2) "kill"
            3) "name"
            4) " Xiaoyan " #  Xiaoyan 

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

This instruction can read multiple streams at the same time , The corresponding meanings of each mental method are as follows :

  • COUNT: Indicates the maximum number of elements read in each stream ;

  • BLOCK: Block read , When there is no message in the message queue , Then block and wait , 0 It means infinite waiting , In milliseconds .

  • ID: news ID, When reading messages, you can specify ID, And from this ID The next message starts reading ,0-0 It means reading from the first element .

If you want to use XREAD Sequential consumption , Remember the returned message after each reading ID, Next call XREAD The last returned message ID Pass it as a parameter to the next call, and you can continue to consume subsequent messages .

*

Lord yunyun , I just arrived at yunlanzong today , The news of history will not be answered , Just want to accept my use XREAD The moment of blocking waiting begins to pass XADD How to adjust the news released ?

function 「」 Mental skill is enough , The last of the mental method 「」 The symbol indicates reading the latest blocking message , If you can't read it, you'll wait forever .

In the process of waiting , Other elders append messages to the queue , Will immediately read .

XREAD COUNT 1 BLOCK 0 STREAMS  Yunlanzong  $

*

Is it so easy to implement message queuing ? Well said ACK Mechanism? ?

It's just an appetizer , adopt XREAD The read data is not actually deleted , When reexecuting XREAD COUNT 2 BLOCK 0 STREAMS Yunlanzong 0-0 The command will be read again .

So we still need ACK Mechanism ,

Next , Let's have a real message queue .

ConsumerGroup

Redis Stream Of ConsumerGroup( Consumer group ) Allows users to logically divide a flow into multiple different flows , And let ConsumerGroup Consumers to deal with .

It's a powerful Persistent message queue supporting multicast .Redis Stream Learn from it Kafka The design of the .

Stream High availability is based on master-slave replication , It is no different from the replication mechanism of other data structures , That is to say Sentinel and Cluster In a cluster environment Stream It can support high availability .

d32d64b2e0668c261f8815321b607725.png

Redis-Stream
  • Redis Stream Its structure is shown in the figure above . There is a message linked list , Every message has a unique ID And the corresponding content ;

  • Message persistence ;

  • The status of each consumer group is independent , No, it doesn't affect , The same Stream Messages will be consumed by all consumer groups ;

  • A consumer group can be composed of multiple consumers , There is a competitive relationship between consumers , Any consumer reading the message will make last_deliverd_id Move forward ;

  • Every consumer has one pending_ids Variable , Used to record that the current consumer has read but has not yet ack The news of . It is used to ensure that the message is consumed by the client at least once .

The message queue implemented by the consumer group mainly involves the following three instructions :

  • XGROUP Used to create 、 Destroy and manage consumer groups .

  • XREADGROUP Read data from the stream through the consumption group .

  • XACK Is a command that allows the consumer to mark the pending message as correctly processed .

Create a consumption group

Stream adopt XGROUP CREATE Instruction create consumption group (Consumer Group), Start message needs to be delivered ID Parameters are used to initialize last_delivered_id Variable .

We use XADD Go to bossStream Insert some messages into the queue :

XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40

The following instructions , The message queue is named bossStream establish 「 Qinglongmen 」 and 「 Six doors 」 Two consumer groups .

#  The grammar is as follows 
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream  Qinglongmen  0-0 MKSTREAM
XGROUP CREATE bossStream  Six doors  0-0 MKSTREAM
  • stream: Specify the name of the queue ;

  • group: Specify the name of the consumer group ;

  • start_id: Specify the consumption group in Stream The beginning of ID, It determines from which consumer group ID Then start reading messages ,0-0 Start reading from the first , $ Indicates that the reading starts from the last one , Only receive new messages .

  • MKSTREAM: By default ,XGROUP CREATE The command returns an error when the target stream does not exist . Optional... Can be used MKSTREAM Subcommand as After the last parameter to automatically create the flow .

Read message

Give Way 「 Qinglongmen 」 Consumer group consumer1 from bossStream Block to read a message :

XREADGROUP GROUP  Qinglongmen  consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957821396-0"
         2) 1) "name"
            2) "zhangsan"
            3) "age"
            4) "26"

The grammar is as follows :

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]

[] The in represents optional parameters , The command and XREAD Be the same in essentials while differing in minor points , The difference is that new GROUP groupName consumerName Options .

The two parameters of this option are used to specify the consumer group to be read and the consumer responsible for processing the message .

among :

  • >: The last argument of the command >, Indicates to start reading from a message that has not been consumed ;

  • BLOCK: Block read ;

Knock on the blackboard

If a message in the message queue is consumed by a consumer in the consumer group , This message will no longer be read by other consumers in this consumer group .

such as consumer2 Perform read operations :

XREADGROUP GROUP  Qinglongmen  consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957838700-0"
         2) 1) "name"
            2) "lisi"
            3) "age"
            4) "2"

consumer2 Can no longer read zhangsan 了 , Instead, read the next lisi Because the news has been consumer1 Read out .

Another purpose of using consumers is to enable multiple consumers in the group to share reading messages , That is, each consumer reads some messages , So as to realize load balancing .

For example, a consumer group has three consumers C1、C2、C3 And a message containing 1、2、3、4、5、6、7 The flow of :

54c3ed3266dc9b021a6737fb59c4fd09.png

XPENDING View read unconfirmed messages

In order to ensure that consumers can still read messages after failure or downtime during consumption ,Stream There's a queue inside (pending List) Save each consumer read but not yet executed ACK The news of .

If consumers use XREADGROUP GROUP groupName consumerName Read message , But not to Stream send out XACK command , The message remains .

For example. bossStream Medium Consumer groups 「 Qinglongmen 」 Each consumer in has read the unconfirmed message information :

XPENDING bossStream  Qinglongmen 
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
  1. 1) Number of unacknowledged messages ;

  2. 2) ~ 3) The messages read by all consumers in qinglongmen are the smallest and the largest ID;

see consumer1 What data was read , Use the following command :

XPENDING bossStream  Qinglongmen  - + 10 consumer1
1) 1) "1645957821396-0"
   2) "consumer1"
   3) (integer) 3758384
   4) (integer) 1

ACK confirm

So when the message is received and the consumption is successful , We need to manually ACK notice Streams, This message will be deleted . The order is as follows :

XACK bossStream  Qinglongmen  1645957821396-0 1645957838700-0
(integer) 2

The grammar is as follows :

XACK key group-key ID [ID ...]

Consumer confirmation increases the reliability of messages , Generally, after the completion of business processing , You need to perform ack Confirm that the message has been consumed , The execution of the whole process is shown in the figure below :

2432f18e77bb48aed6574a05af269b85.png

Stream Overall process

Use Redisson actual combat

Use maven Add dependency

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>

add to Redis To configure , Code brother's Redis No password configured , You can configure it according to the actual situation .

spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false
@Slf4j
@Service
public class QueueService {

    @Autowired
    private RedissonClient redissonClient;

    /**
     *  Send message to queue 
     *
     * @param message
     */
    public void sendMessage(String message) {
        RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.add("speed", "19");
        stream.add("velocity", "39%");
        stream.add("temperature", "10C");
    }

    /**
     *  Consumer consumption news 
     *
     * @param message
     */
    public void consumerMessage(String message) {
        RStream<String, String> stream = redissonClient.getStream("sensor#4921");

        stream.createGroup("sensors_data", StreamMessageId.ALL);

        Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
        for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
          Map<String, String> msg = entry.getValue();
          System.out.println(msg);

          stream.ack("sensors_data", entry.getKey());
        }

    }

}

4f88bfaf079011612de39a5c677b9770.gif

Previous recommendation

from 40% Fell to 4%,“ paste ” the Firefox Can you return to the top ?

Gartner Release 2022 Five major technological trends in the automotive industry in

Use this library , Let your service operate Redis Speed up

comic : What is? “ Low code ” Development platform ?

03ea9041ce84d3f7d92dd74f233ccc9e.gif

Share

c86e5dfd40623f77005ec79c5c6a0048.gif

Point collection

6e3696d4303956435d10ce1a929db287.gif

A little bit of praise

29efa79d9e11b0540bac579fbde8a3ae.gif

Click to see

原网站

版权声明
本文为[CSDN cloud computing]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203012301392681.html