当前位置:网站首页>Mise en file d'attente des messages en utilisant jedis Listening redis stream
Mise en file d'attente des messages en utilisant jedis Listening redis stream
2022-06-26 05:19:00 【Hu Hailong Blog】
Introduction
Précédemment utiliséSpringBootVa écouter.Redis StreamMise en file d'attente des messages,Ce partage est fait avecJedisPour réaliser la même fonctionnalité,Et vous pouvez continuer à étendre la fonctionnalité,Parce queJedisJe pense que c'est encore plus flexible qu'avant.Cette mise en œuvre de l'écoute peut utiliser plusieurs Threads pour écouter.
Avant de passerSpringBootLien vers l'article de mise en oeuvre:
SpringBoot Utilisé dansRedis Stream Réaliser l'écoute des messages
Présentation vidéo
UtiliserJedisRéaliser l'écoute par soi - mêmeRedis StreamLa fonction de la file d'attente des messages atteint l'effetDemo
Principe de réalisation
Cette mise en œuvre de l'écoute est divisée en écoute de groupe et de consommateur et en utilisation de modexreadSurveillance native de,La différence est que si vous utilisez un moniteur comme un groupe et un consommateur, vous pouvez vous assurer que le message n'est consommé qu'une seule fois par le même consommateur,Pas de consommation répétée de messages,Convient aux scénarios qui exigent l'unicité des données,Comme l'entreposage ou d'autres opérations.Par défautxreadL'implémentation écoute plusieurs Threads qui reçoivent le même message inséré en même temps,Peut être compris comme une façon de recevoir des messages à la radio.
C'est principalement basé surRedis StreamLes commandes suivantes correspondent àJedisMéthode:
- xadd:Créer un groupe
- xread:Lire les données
- xgroup: Créer un groupe
- xreadgroup: Lire le message du Groupe
Ils sont principalement utilisés pour la lecture blockPropriétés,Oui.blockLa propriété est définie à0 Est bloqué jusqu'à ce qu'un nouveau message soit reçu , Et j'ai mis cette étape dans un sondage , Mise en œuvre du blocage entrée dans le prochain blocage après réception du message , Pour obtenir l'effet d'écoute .
Code de mise en œuvre
Cette foisdemoLe Code est simple, Dans une classe , Et tant qu'il y a redis Peut être exécuté directement après avoir modifié la configuration dans le Code ,Il n'est pas nécessaire de créer manuellementstream Ou groupe, etc. .
pom.xmlDocumentation
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>vip.huhailong</groupId>
<artifactId>JRedisMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jedis.version>4.2.3</jedis.version>
</properties>
<!-- jedis dependency -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
Code de mise en œuvre
package jredismq.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/** * UtiliserjedisRéaliser l'écoutestreamMessage */
public class JedisStreamMQTest {
private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
public static void main(String[] args) {
// Ce qui suit est modifié en fonction de votre situation
String host = "192.168.1.110";
int port = 6379;
int timeout = 1000;
String password = "huhailong";
int database = 0;
String streamKeyName = "streamtest";
String groupName = "testgroup";
String[]consumerNames = {
"huhailong", "xiaohu"};
String listenerType = "DEFAULT"; //GROUP or DEFAULT
//Création redis Instance du pool de connexion
JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
JedisStreamMQTest test = new JedisStreamMQTest();
test.createGroup(pool,streamKeyName,groupName); //Créer un groupe
if("GROUP".equals(listenerType)){
test.listenerByGroup(pool,streamKeyName,groupName,consumerNames); // Utiliser l'écoute des groupes et des consommateurs
}else{
test.listenerDefault(pool,streamKeyName);
}
new Thread(()->{
//Thread3:Pour écrirestreamDonnées
Jedis jedis = pool.getResource();
while(true) {
try {
Thread.sleep(500L);
Map<String,String> map = new HashMap<>();
map.put("currentTime", LocalDateTime.now().toString());
jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}).start();
}
/** * Utiliser l'écoute des groupes et des consommateurs , L'écoute assure que les messages ne sont pas consommés en double , Parce que chaque groupe ne consomme des messages qu'une seule fois par utilisateur * @param keyName stream Nom * @param groupName Nom du Groupe * @param consumerNames Collection de noms de consommateurs */
private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
// Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème
IntStream.range(0,2).forEach(i->{
Jedis jedis = pool.getResource(); //CréationjedisExemple
new Thread(()->{
while(true){
try{
Thread.sleep(500L);
//En bas. xreadGroup Équivalence méthodologique et redisDansxreadgroupLes ordres,Oui.block Le temps de blocage est fixé à 0 Indique qu'il a été bloqué jusqu'à ce que le message soit reçu ,Et là - haut.StreamEntryID Set to receive the latest value
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); //Message de confirmation
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID()); //Supprimer le message
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
/** * Lire sans utiliser le concept de groupe et de consommateur , Plusieurs Threads dupliquent les données de consommation * @param keyName stream Nom */
private void listenerDefault(JedisPool pool, String keyName){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
// Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème
IntStream.range(0,2).forEach(i->{
new Thread(()->{
Jedis jedis = pool.getResource(); //CréationjedisExemple
while(true){
try{
Thread.sleep(500L);
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
private void createGroup(JedisPool pool, String keyName, String groupName){
Jedis jedis = pool.getResource();
try{
//StreamEntryID Représente la création d'un groupe et la réception de nouveaux messages , Ceci peut être réglé en fonction de vos besoins ,0 Indique que tous les messages historiques sont lus ,Derrière.boolean La valeur indique si stream Il n'y a pas de création stream
jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
} catch (Exception e){
// L'exception a été saisie ici parce qu'il est possible que le Groupe existait déjà au moment de la création.
logger.error(e.getMessage());
}
}
}
BendemoLe Code est encore plus simple, Vous pouvez modifier et encapsuler en fonction de vos besoins . J'explore également la possibilité d'encapsuler et de peaufiner un projet complet de cette façon. , Ne dépend pas d'un cadre tiers, par exemple SpringProjets pour, Pour pouvoir l'utiliser avec souplesse , Un petit ami qui trouve ça utile. !
边栏推荐
- app 应用安装到手机,不显示图标,引发的思考
- [geek] product manager training camp
- vscode config
- Tensorflow and deep learning day 3
- Keras actual combat cifar10 in tensorflow
- Guanghetong and anti international bring 5g R16 powerful performance to the AI edge computing platform based on NVIDIA Jetson Xavier nx
- 递归遍历目录结构和树状展现
- ECCV 2020 double champion team, take you to conquer target detection on the 7th
- Computer Vision Tools Chain
- cartographer_fast_correlative_scan_matcher_2d分支定界粗匹配
猜你喜欢

LeetCode_二叉搜索树_简单_108.将有序数组转换为二叉搜索树

ModuleNotFoundError: No module named ‘numpy‘

6.1 - 6.2 公鑰密碼學簡介

百度API地图的标注不是居中显示,而是显示在左上角是怎么回事?已解决!

Ad tutorial series | 4 - creating an integration library file

redis探索之布隆过滤器
Briefly describe the pitfalls of mobile IM development: architecture design, communication protocol and client
![[unity3d] collider assembly](/img/de/29ecf4612c540e2df715f56c31cf1a.png)
[unity3d] collider assembly

Serious hazard warning! Log4j execution vulnerability is exposed!

5. < tag stack and general problems > supplement: lt.946 Verify the stack sequence (the same as the push in and pop-up sequence of offer 31. stack)
随机推荐
瀚高数据库自定义操作符‘!~~‘
cartographer_ local_ trajectory_ builder_ 2d
Mysql 源码阅读(二)登录连接调试
创建 SSH 秘钥对 配置步骤
CMakeLists.txt Template
Classic theory: detailed explanation of three handshakes and four waves of TCP protocol
cartographer_local_trajectory_builder_2d
Vie procédurale
Serious hazard warning! Log4j execution vulnerability is exposed!
两步处理字符串正则匹配得到JSON列表
二次bootloader关于boot28.asm应用的注意事项,28035的
[quartz] read configuration from database to realize dynamic timing task
Official image acceleration
The first gift of the project, the flying oar contract!
6.1 - 6.2 introduction to public key cryptography
2. < tag dynamic programming and conventional problems > lt.343 integer partition
Technical problems to be faced in mobile terminal im development
Implementation of IM message delivery guarantee mechanism (II): ensure reliable delivery of offline messages
红队得分方法统计
vscode config