当前位置:网站首页>Mise en file d'attente des messages en utilisant jedis Listening redis stream

Mise en file d'attente des messages en utilisant jedis Listening redis stream

2022-06-26 05:19:00 Hu Hailong Blog

Introduction

Précédemment utiliséSpringBootVa écouter.Redis StreamMise en file d'attente des messages,Ce partage est fait avecJedisPour réaliser la même fonctionnalité,Et vous pouvez continuer à étendre la fonctionnalité,Parce queJedisJe pense que c'est encore plus flexible qu'avant.Cette mise en œuvre de l'écoute peut utiliser plusieurs Threads pour écouter.

Avant de passerSpringBootLien vers l'article de mise en oeuvre:
SpringBoot Utilisé dansRedis Stream Réaliser l'écoute des messages

Présentation vidéo

UtiliserJedisRéaliser l'écoute par soi - mêmeRedis StreamLa fonction de la file d'attente des messages atteint l'effetDemo

Principe de réalisation

Cette mise en œuvre de l'écoute est divisée en écoute de groupe et de consommateur et en utilisation de modexreadSurveillance native de,La différence est que si vous utilisez un moniteur comme un groupe et un consommateur, vous pouvez vous assurer que le message n'est consommé qu'une seule fois par le même consommateur,Pas de consommation répétée de messages,Convient aux scénarios qui exigent l'unicité des données,Comme l'entreposage ou d'autres opérations.Par défautxreadL'implémentation écoute plusieurs Threads qui reçoivent le même message inséré en même temps,Peut être compris comme une façon de recevoir des messages à la radio.

C'est principalement basé surRedis StreamLes commandes suivantes correspondent àJedisMéthode:

  • xadd:Créer un groupe
  • xread:Lire les données
  • xgroup: Créer un groupe
  • xreadgroup: Lire le message du Groupe

Ils sont principalement utilisés pour la lecture blockPropriétés,Oui.blockLa propriété est définie à0 Est bloqué jusqu'à ce qu'un nouveau message soit reçu , Et j'ai mis cette étape dans un sondage , Mise en œuvre du blocage entrée dans le prochain blocage après réception du message , Pour obtenir l'effet d'écoute .

Code de mise en œuvre

Cette foisdemoLe Code est simple, Dans une classe , Et tant qu'il y a redis Peut être exécuté directement après avoir modifié la configuration dans le Code ,Il n'est pas nécessaire de créer manuellementstream Ou groupe, etc. .

pom.xmlDocumentation

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>vip.huhailong</groupId>
    <artifactId>JRedisMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <jedis.version>4.2.3</jedis.version>
    </properties>

    <!-- jedis dependency -->
    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${jedis.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.11</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7</version>
        </dependency>
    </dependencies>

</project>

Code de mise en œuvre

package jredismq.test;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

/** * UtiliserjedisRéaliser l'écoutestreamMessage */
public class JedisStreamMQTest {
    

    private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);

    public static void main(String[] args) {
    
		// Ce qui suit est modifié en fonction de votre situation 
        String host = "192.168.1.110";
        int port = 6379;
        int timeout = 1000;
        String password = "huhailong";
        int database = 0;

        String streamKeyName = "streamtest";
        String groupName = "testgroup";

        String[]consumerNames = {
    "huhailong", "xiaohu"};

        String listenerType = "DEFAULT";  //GROUP or DEFAULT

        //Création redis Instance du pool de connexion
        JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);

        JedisStreamMQTest test = new JedisStreamMQTest();
        test.createGroup(pool,streamKeyName,groupName); //Créer un groupe

        if("GROUP".equals(listenerType)){
    
            test.listenerByGroup(pool,streamKeyName,groupName,consumerNames);   // Utiliser l'écoute des groupes et des consommateurs 
        }else{
    
            test.listenerDefault(pool,streamKeyName);
        }

        new Thread(()->{
        //Thread3:Pour écrirestreamDonnées
            Jedis jedis = pool.getResource();
            while(true) {
    
                try {
    
                    Thread.sleep(500L);
                    Map<String,String> map = new HashMap<>();
                    map.put("currentTime", LocalDateTime.now().toString());
                    jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
                } catch (Exception e) {
    
                    logger.error(e.getMessage());
                }
            }
        }).start();
    }

    /** *  Utiliser l'écoute des groupes et des consommateurs , L'écoute assure que les messages ne sont pas consommés en double , Parce que chaque groupe ne consomme des messages qu'une seule fois par utilisateur  * @param keyName stream Nom * @param groupName Nom du Groupe * @param consumerNames  Collection de noms de consommateurs  */
    private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
    
        Map<String, StreamEntryID> entryIDMap = new HashMap<>();
        entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
        // Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème 
        IntStream.range(0,2).forEach(i->{
    
            Jedis jedis = pool.getResource();   //CréationjedisExemple
            new Thread(()->{
    
                while(true){
    
                    try{
    
                        Thread.sleep(500L);
                        //En bas. xreadGroup  Équivalence méthodologique et redisDansxreadgroupLes ordres,Oui.block Le temps de blocage est fixé à 0 Indique qu'il a été bloqué jusqu'à ce que le message soit reçu ,Et là - haut.StreamEntryID Set to receive the latest value 
                        List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
                        logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); //Message de confirmation
                        jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());   //Supprimer le message
                    } catch (Exception e){
    
                        logger.error(e.getMessage());
                    }
                }
            }).start();
        });

    }

    /** *  Lire sans utiliser le concept de groupe et de consommateur , Plusieurs Threads dupliquent les données de consommation  * @param keyName stream Nom */
    private void listenerDefault(JedisPool pool, String keyName){
    
        Map<String, StreamEntryID> entryIDMap = new HashMap<>();
        entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
        // Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème 
        IntStream.range(0,2).forEach(i->{
    
            new Thread(()->{
    
                Jedis jedis = pool.getResource();   //CréationjedisExemple
                while(true){
    
                    try{
    
                        Thread.sleep(500L);
                        List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
                        logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
                        jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
                    } catch (Exception e){
    
                        logger.error(e.getMessage());
                    }
                }
            }).start();
        });
    }

    private void createGroup(JedisPool pool, String keyName, String groupName){
    
        Jedis jedis = pool.getResource();
        try{
    
            //StreamEntryID  Représente la création d'un groupe et la réception de nouveaux messages , Ceci peut être réglé en fonction de vos besoins ,0 Indique que tous les messages historiques sont lus ,Derrière.boolean La valeur indique si stream Il n'y a pas de création stream
            jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
        } catch (Exception e){
    
            // L'exception a été saisie ici parce qu'il est possible que le Groupe existait déjà au moment de la création. 
            logger.error(e.getMessage());
        }
    }

}

BendemoLe Code est encore plus simple, Vous pouvez modifier et encapsuler en fonction de vos besoins . J'explore également la possibilité d'encapsuler et de peaufiner un projet complet de cette façon. , Ne dépend pas d'un cadre tiers, par exemple SpringProjets pour, Pour pouvoir l'utiliser avec souplesse , Un petit ami qui trouve ça utile. !

原网站

版权声明
本文为[Hu Hailong Blog]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260518151254.html