当前位置:网站首页>Use jedis to monitor redis stream to realize message queue function

Use jedis to monitor redis stream to realize message queue function

2022-06-26 05:20:00 Huhailong blog

brief introduction

Before using SpringBoot To monitor Redis Stream Realize the function of message queue , This time, we are sharing Jedis To do the same thing , But also can continue to expand the function , because Jedis I think it is still more flexible than the previous way . This implementation can use multithreading to listen .

Passed before SpringBoot Realize the article link :
SpringBoot Use in Redis Stream Realize message monitoring

Video demo

Use Jedis Monitor by yourself Redis Stream To achieve the effect of message queue Demo

Realization principle

This time, I will implement monitoring through group and consumer monitoring and mode use xread Native listening for , The difference between them is that if monitoring through groups and consumers is used, it can ensure that messages will only be consumed by the same consumer once , No repeated consumption of messages , Suitable for scenarios requiring data uniqueness , Such as warehousing or other operations . default xread Implementation mode: if there are several threads listening, these threads will receive the same inserted message at the same time , It can be understood as receiving messages by broadcasting .

This is mainly based on Redis Stream The following commands in correspond to Jedis Methods :

  • xadd: Create groups
  • xread: Reading data
  • xgroup: Create groups
  • xreadgroup: Read group messages

They are mainly used for reading block attribute , take block Property is set to 0 It means that it will be blocked until a new message is received , Then I put this step into a poll , It implements blocking. After receiving the message, it enters the next blocking , Thus, the monitoring effect is realized .

Implementation code

This time demo The code is simple , Are put into a class , And as long as there is redis You can run it directly after modifying the configuration in the code , You don't need to create... Manually stream Or group operation .

pom.xml file

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>vip.huhailong</groupId>
    <artifactId>JRedisMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <jedis.version>4.2.3</jedis.version>
    </properties>

    <!-- jedis dependency -->
    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${jedis.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.11</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7</version>
        </dependency>
    </dependencies>

</project>

Implementation code

package jredismq.test;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

/** *  Use jedis Implement monitoring stream news  */
public class JedisStreamMQTest {
    

    private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);

    public static void main(String[] args) {
    
		// The following contents shall be modified according to their own conditions 
        String host = "192.168.1.110";
        int port = 6379;
        int timeout = 1000;
        String password = "huhailong";
        int database = 0;

        String streamKeyName = "streamtest";
        String groupName = "testgroup";

        String[]consumerNames = {
    "huhailong", "xiaohu"};

        String listenerType = "DEFAULT";  //GROUP or DEFAULT

        // establish  redis  Connection pool instance 
        JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);

        JedisStreamMQTest test = new JedisStreamMQTest();
        test.createGroup(pool,streamKeyName,groupName); // Create groups 

        if("GROUP".equals(listenerType)){
    
            test.listenerByGroup(pool,streamKeyName,groupName,consumerNames);   // Use groups and consumers to listen 
        }else{
    
            test.listenerDefault(pool,streamKeyName);
        }

        new Thread(()->{
        // Threads 3: Used to write stream data 
            Jedis jedis = pool.getResource();
            while(true) {
    
                try {
    
                    Thread.sleep(500L);
                    Map<String,String> map = new HashMap<>();
                    map.put("currentTime", LocalDateTime.now().toString());
                    jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
                } catch (Exception e) {
    
                    logger.error(e.getMessage());
                }
            }
        }).start();
    }

    /** *  Use groups and consumers to listen , This listening ensures that messages are not consumed repeatedly , Because each group and each user will consume messages only once  * @param keyName stream  name  * @param groupName  The name of the group  * @param consumerNames  Consumer name set  */
    private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
    
        Map<String, StreamEntryID> entryIDMap = new HashMap<>();
        entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
        // The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem 
        IntStream.range(0,2).forEach(i->{
    
            Jedis jedis = pool.getResource();   // establish jedis example 
            new Thread(()->{
    
                while(true){
    
                    try{
    
                        Thread.sleep(500L);
                        // Below  xreadGroup  Method equivalent to redis Medium xreadgroup command , take block The blocking time is set to 0 Indicates that the message is always blocked until it is received , And then up there StreamEntryID Set to receive the latest value 
                        List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
                        logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); // Confirmation message 
                        jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());   // removal message 
                    } catch (Exception e){
    
                        logger.error(e.getMessage());
                    }
                }
            }).start();
        });

    }

    /** *  Do not use the concept of groups and consumers to read , Multiple threads will consume data repeatedly  * @param keyName stream  name  */
    private void listenerDefault(JedisPool pool, String keyName){
    
        Map<String, StreamEntryID> entryIDMap = new HashMap<>();
        entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
        // The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem 
        IntStream.range(0,2).forEach(i->{
    
            new Thread(()->{
    
                Jedis jedis = pool.getResource();   // establish jedis example 
                while(true){
    
                    try{
    
                        Thread.sleep(500L);
                        List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
                        logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
                        jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
                    } catch (Exception e){
    
                        logger.error(e.getMessage());
                    }
                }
            }).start();
        });
    }

    private void createGroup(JedisPool pool, String keyName, String groupName){
    
        Jedis jedis = pool.getResource();
        try{
    
            //StreamEntryID  Means to create a group and receive new messages , Here you can set it according to your own needs ,0 Indicates that all historical messages are read , hinder boolean Value indicates if stream Does not exist create stream
            jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
        } catch (Exception e){
    
            // The reason why the exception is caught here is that the group may already exist when it is created 
            logger.error(e.getMessage());
        }
    }

}

Ben demo The code is relatively simple , You can modify and encapsulate according to your own needs . I am also continuing to explore the use of this method to package and improve into a complete project , Does not rely on third-party frameworks such as Spring Project , So you can use it flexibly , If you find it useful, please give me some praise !

原网站

版权声明
本文为[Huhailong blog]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260518151254.html