当前位置:网站首页>Use jedis to monitor redis stream to realize message queue function
Use jedis to monitor redis stream to realize message queue function
2022-06-26 05:20:00 【Huhailong blog】
brief introduction
Before using SpringBoot To monitor Redis Stream Realize the function of message queue , This time, we are sharing Jedis To do the same thing , But also can continue to expand the function , because Jedis I think it is still more flexible than the previous way . This implementation can use multithreading to listen .
Passed before SpringBoot Realize the article link :
SpringBoot Use in Redis Stream Realize message monitoring
Video demo
Use Jedis Monitor by yourself Redis Stream To achieve the effect of message queue Demo
Realization principle
This time, I will implement monitoring through group and consumer monitoring and mode use xread Native listening for , The difference between them is that if monitoring through groups and consumers is used, it can ensure that messages will only be consumed by the same consumer once , No repeated consumption of messages , Suitable for scenarios requiring data uniqueness , Such as warehousing or other operations . default xread Implementation mode: if there are several threads listening, these threads will receive the same inserted message at the same time , It can be understood as receiving messages by broadcasting .
This is mainly based on Redis Stream The following commands in correspond to Jedis Methods :
- xadd: Create groups
- xread: Reading data
- xgroup: Create groups
- xreadgroup: Read group messages
They are mainly used for reading block attribute , take block Property is set to 0 It means that it will be blocked until a new message is received , Then I put this step into a poll , It implements blocking. After receiving the message, it enters the next blocking , Thus, the monitoring effect is realized .
Implementation code
This time demo The code is simple , Are put into a class , And as long as there is redis You can run it directly after modifying the configuration in the code , You don't need to create... Manually stream Or group operation .
pom.xml file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>vip.huhailong</groupId>
<artifactId>JRedisMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jedis.version>4.2.3</jedis.version>
</properties>
<!-- jedis dependency -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
Implementation code
package jredismq.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/** * Use jedis Implement monitoring stream news */
public class JedisStreamMQTest {
private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
public static void main(String[] args) {
// The following contents shall be modified according to their own conditions
String host = "192.168.1.110";
int port = 6379;
int timeout = 1000;
String password = "huhailong";
int database = 0;
String streamKeyName = "streamtest";
String groupName = "testgroup";
String[]consumerNames = {
"huhailong", "xiaohu"};
String listenerType = "DEFAULT"; //GROUP or DEFAULT
// establish redis Connection pool instance
JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
JedisStreamMQTest test = new JedisStreamMQTest();
test.createGroup(pool,streamKeyName,groupName); // Create groups
if("GROUP".equals(listenerType)){
test.listenerByGroup(pool,streamKeyName,groupName,consumerNames); // Use groups and consumers to listen
}else{
test.listenerDefault(pool,streamKeyName);
}
new Thread(()->{
// Threads 3: Used to write stream data
Jedis jedis = pool.getResource();
while(true) {
try {
Thread.sleep(500L);
Map<String,String> map = new HashMap<>();
map.put("currentTime", LocalDateTime.now().toString());
jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}).start();
}
/** * Use groups and consumers to listen , This listening ensures that messages are not consumed repeatedly , Because each group and each user will consume messages only once * @param keyName stream name * @param groupName The name of the group * @param consumerNames Consumer name set */
private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
// The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem
IntStream.range(0,2).forEach(i->{
Jedis jedis = pool.getResource(); // establish jedis example
new Thread(()->{
while(true){
try{
Thread.sleep(500L);
// Below xreadGroup Method equivalent to redis Medium xreadgroup command , take block The blocking time is set to 0 Indicates that the message is always blocked until it is received , And then up there StreamEntryID Set to receive the latest value
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); // Confirmation message
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID()); // removal message
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
/** * Do not use the concept of groups and consumers to read , Multiple threads will consume data repeatedly * @param keyName stream name */
private void listenerDefault(JedisPool pool, String keyName){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
// The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem
IntStream.range(0,2).forEach(i->{
new Thread(()->{
Jedis jedis = pool.getResource(); // establish jedis example
while(true){
try{
Thread.sleep(500L);
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
private void createGroup(JedisPool pool, String keyName, String groupName){
Jedis jedis = pool.getResource();
try{
//StreamEntryID Means to create a group and receive new messages , Here you can set it according to your own needs ,0 Indicates that all historical messages are read , hinder boolean Value indicates if stream Does not exist create stream
jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
} catch (Exception e){
// The reason why the exception is caught here is that the group may already exist when it is created
logger.error(e.getMessage());
}
}
}
Ben demo The code is relatively simple , You can modify and encapsulate according to your own needs . I am also continuing to explore the use of this method to package and improve into a complete project , Does not rely on third-party frameworks such as Spring Project , So you can use it flexibly , If you find it useful, please give me some praise !
边栏推荐
- Experience of reading the road to wealth and freedom
- Vie procédurale
- cartographer_ local_ trajectory_ builder_ 2d
- Baidu API map is not displayed in the middle, but in the upper left corner. What's the matter? Resolved!
- C# 39. string类型和byte[]类型相互转换(实测)
- Implementation of IM message delivery guarantee mechanism (II): ensure reliable delivery of offline messages
- [ide (imagebed)]picgo+typora+aliyunoss deployment blog Gallery (2022.6)
- Transport layer TCP protocol and UDP protocol
- cartographer_ fast_ correlative_ scan_ matcher_ 2D branch and bound rough matching
- UWB ultra high precision positioning system architecture
猜你喜欢
Briefly describe the pitfalls of mobile IM development: architecture design, communication protocol and client
Protocol selection of mobile IM system: UDP or TCP?

百度API地图的标注不是居中显示,而是显示在左上角是怎么回事?已解决!

zencart新建的URL怎么重写伪静态

redis探索之布隆过滤器

Tensorflow and deep learning day 3

【ARM】在NUC977上搭建基于boa的嵌入式web服务器
![[latex] error type summary (hold the change)](/img/3c/bbb7f496c5ea48c6941cd4aceb5065.png)
[latex] error type summary (hold the change)

6.1 - 6.2 公鑰密碼學簡介

Apktool tool usage document
随机推荐
Codeforces Round #802 (Div. 2)(A-D)
[red team] what preparations should be made to join the red team?
Day4 branch and loop jobs
使用Jedis监听Redis Stream 实现消息队列功能
Procedural life
瀚高数据库自定义操作符‘!~~‘
What is UWB in ultra-high precision positioning system
Tensorflow visualization tensorboard "no graph definition files were found." error
Chapter 9 setting up structured logging (I)
Mongodb image configuration method
How to rewrite a pseudo static URL created by zenpart
二次bootloader关于boot28.asm应用的注意事项,28035的
[quartz] read configuration from database to realize dynamic timing task
cartographer_fast_correlative_scan_matcher_2d分支定界粗匹配
Technical problems to be faced in mobile terminal im development
Practical cases | getting started and mastering tkinter+pyinstaller
Serious hazard warning! Log4j execution vulnerability is exposed!
Vie procédurale
Installation and deployment of alluxio
SOFA Weekly | 开源人—于雨、本周 QA、本周 Contributor