当前位置:网站首页>Mise en file d'attente des messages en utilisant jedis Listening redis stream
Mise en file d'attente des messages en utilisant jedis Listening redis stream
2022-06-26 05:19:00 【Hu Hailong Blog】
Introduction
Précédemment utiliséSpringBootVa écouter.Redis StreamMise en file d'attente des messages,Ce partage est fait avecJedisPour réaliser la même fonctionnalité,Et vous pouvez continuer à étendre la fonctionnalité,Parce queJedisJe pense que c'est encore plus flexible qu'avant.Cette mise en œuvre de l'écoute peut utiliser plusieurs Threads pour écouter.
Avant de passerSpringBootLien vers l'article de mise en oeuvre:
SpringBoot Utilisé dansRedis Stream Réaliser l'écoute des messages
Présentation vidéo
UtiliserJedisRéaliser l'écoute par soi - mêmeRedis StreamLa fonction de la file d'attente des messages atteint l'effetDemo
Principe de réalisation
Cette mise en œuvre de l'écoute est divisée en écoute de groupe et de consommateur et en utilisation de modexreadSurveillance native de,La différence est que si vous utilisez un moniteur comme un groupe et un consommateur, vous pouvez vous assurer que le message n'est consommé qu'une seule fois par le même consommateur,Pas de consommation répétée de messages,Convient aux scénarios qui exigent l'unicité des données,Comme l'entreposage ou d'autres opérations.Par défautxreadL'implémentation écoute plusieurs Threads qui reçoivent le même message inséré en même temps,Peut être compris comme une façon de recevoir des messages à la radio.
C'est principalement basé surRedis StreamLes commandes suivantes correspondent àJedisMéthode:
- xadd:Créer un groupe
- xread:Lire les données
- xgroup: Créer un groupe
- xreadgroup: Lire le message du Groupe
Ils sont principalement utilisés pour la lecture blockPropriétés,Oui.blockLa propriété est définie à0 Est bloqué jusqu'à ce qu'un nouveau message soit reçu , Et j'ai mis cette étape dans un sondage , Mise en œuvre du blocage entrée dans le prochain blocage après réception du message , Pour obtenir l'effet d'écoute .
Code de mise en œuvre
Cette foisdemoLe Code est simple, Dans une classe , Et tant qu'il y a redis Peut être exécuté directement après avoir modifié la configuration dans le Code ,Il n'est pas nécessaire de créer manuellementstream Ou groupe, etc. .
pom.xmlDocumentation
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>vip.huhailong</groupId>
<artifactId>JRedisMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jedis.version>4.2.3</jedis.version>
</properties>
<!-- jedis dependency -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
Code de mise en œuvre
package jredismq.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/** * UtiliserjedisRéaliser l'écoutestreamMessage */
public class JedisStreamMQTest {
private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
public static void main(String[] args) {
// Ce qui suit est modifié en fonction de votre situation
String host = "192.168.1.110";
int port = 6379;
int timeout = 1000;
String password = "huhailong";
int database = 0;
String streamKeyName = "streamtest";
String groupName = "testgroup";
String[]consumerNames = {
"huhailong", "xiaohu"};
String listenerType = "DEFAULT"; //GROUP or DEFAULT
//Création redis Instance du pool de connexion
JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
JedisStreamMQTest test = new JedisStreamMQTest();
test.createGroup(pool,streamKeyName,groupName); //Créer un groupe
if("GROUP".equals(listenerType)){
test.listenerByGroup(pool,streamKeyName,groupName,consumerNames); // Utiliser l'écoute des groupes et des consommateurs
}else{
test.listenerDefault(pool,streamKeyName);
}
new Thread(()->{
//Thread3:Pour écrirestreamDonnées
Jedis jedis = pool.getResource();
while(true) {
try {
Thread.sleep(500L);
Map<String,String> map = new HashMap<>();
map.put("currentTime", LocalDateTime.now().toString());
jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}).start();
}
/** * Utiliser l'écoute des groupes et des consommateurs , L'écoute assure que les messages ne sont pas consommés en double , Parce que chaque groupe ne consomme des messages qu'une seule fois par utilisateur * @param keyName stream Nom * @param groupName Nom du Groupe * @param consumerNames Collection de noms de consommateurs */
private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
// Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème
IntStream.range(0,2).forEach(i->{
Jedis jedis = pool.getResource(); //CréationjedisExemple
new Thread(()->{
while(true){
try{
Thread.sleep(500L);
//En bas. xreadGroup Équivalence méthodologique et redisDansxreadgroupLes ordres,Oui.block Le temps de blocage est fixé à 0 Indique qu'il a été bloqué jusqu'à ce que le message soit reçu ,Et là - haut.StreamEntryID Set to receive the latest value
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); //Message de confirmation
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID()); //Supprimer le message
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
/** * Lire sans utiliser le concept de groupe et de consommateur , Plusieurs Threads dupliquent les données de consommation * @param keyName stream Nom */
private void listenerDefault(JedisPool pool, String keyName){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
// Ce qui suit n'utilise pas de pool de Threads pour démontrer la simplicité , Créer deux Threads directement pour expliquer le problème
IntStream.range(0,2).forEach(i->{
new Thread(()->{
Jedis jedis = pool.getResource(); //CréationjedisExemple
while(true){
try{
Thread.sleep(500L);
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
private void createGroup(JedisPool pool, String keyName, String groupName){
Jedis jedis = pool.getResource();
try{
//StreamEntryID Représente la création d'un groupe et la réception de nouveaux messages , Ceci peut être réglé en fonction de vos besoins ,0 Indique que tous les messages historiques sont lus ,Derrière.boolean La valeur indique si stream Il n'y a pas de création stream
jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
} catch (Exception e){
// L'exception a été saisie ici parce qu'il est possible que le Groupe existait déjà au moment de la création.
logger.error(e.getMessage());
}
}
}
BendemoLe Code est encore plus simple, Vous pouvez modifier et encapsuler en fonction de vos besoins . J'explore également la possibilité d'encapsuler et de peaufiner un projet complet de cette façon. , Ne dépend pas d'un cadre tiers, par exemple SpringProjets pour, Pour pouvoir l'utiliser avec souplesse , Un petit ami qui trouve ça utile. !
边栏推荐
- Two step processing of string regular matching to get JSON list
- Second day of deep learning and tensorfow
- Chapter 9 setting up structured logging (I)
- Vie procédurale
- cartographer_optimization_problem_2d
- [greedy college] Figure neural network advanced training camp
- Setting pseudo static under fastadmin Apache
- Pycharm package import error without warning
- [unity3d] rigid body component
- AD教程系列 | 4 - 创建集成库文件
猜你喜欢

Tp5.0框架 PDO连接mysql 报错:Too many connections 解决方法

递归遍历目录结构和树状展现

Ai+ remote sensing: releasing the value of each pixel

The localstorage browser stores locally to limit the number of forms submitted when tourists do not log in.

How to rewrite a pseudo static URL created by zenpart

Ad tutorial series | 4 - creating an integration library file

关于支付接口回调地址参数字段是“notify_url”,签名过后的特殊字符url编码以后再解码后出现错误(¬ , ¢, ¤, £)

What is UWB in ultra-high precision positioning system
![C# 39. Conversion between string type and byte[] type (actual measurement)](/img/33/046aef4e0c1d7c0c0d60c28e707546.png)
C# 39. Conversion between string type and byte[] type (actual measurement)

Recursively traverse directory structure and tree presentation
随机推荐
第九章 设置结构化日志记录(一)
Sentimentin tensorflow_ analysis_ cell
Why does the mobile IM based on TCP still need to keep the heartbeat alive?
[greedy college] Figure neural network advanced training camp
First day of deep learning and tensorflow learning
Douban top250
LeetCode 19. Delete the penultimate node of the linked list
Official image acceleration
Two step processing of string regular matching to get JSON list
【Unity3D】碰撞体组件Collider
2. < tag dynamic programming and conventional problems > lt.343 integer partition
The best Chinese open source class of vision transformer, ten hours of on-site coding to play with the popular model of Vit!
cartographer_optimization_problem_2d
ThreadPoolExecutor implements file uploading and batch inserting data
【Unity3D】刚体组件Rigidbody
[unity3d] human computer interaction input
Tensorflow and deep learning day 3
Keras actual combat cifar10 in tensorflow
GD32F3x0 官方PWM驱动正频宽偏小(定时不准)的问题
zencart新建的URL怎么重写伪静态