当前位置:网站首页>Use jedis to monitor redis stream to realize message queue function
Use jedis to monitor redis stream to realize message queue function
2022-06-26 05:20:00 【Huhailong blog】
brief introduction
Before using SpringBoot To monitor Redis Stream Realize the function of message queue , This time, we are sharing Jedis To do the same thing , But also can continue to expand the function , because Jedis I think it is still more flexible than the previous way . This implementation can use multithreading to listen .
Passed before SpringBoot Realize the article link :
SpringBoot Use in Redis Stream Realize message monitoring
Video demo
Use Jedis Monitor by yourself Redis Stream To achieve the effect of message queue Demo
Realization principle
This time, I will implement monitoring through group and consumer monitoring and mode use xread Native listening for , The difference between them is that if monitoring through groups and consumers is used, it can ensure that messages will only be consumed by the same consumer once , No repeated consumption of messages , Suitable for scenarios requiring data uniqueness , Such as warehousing or other operations . default xread Implementation mode: if there are several threads listening, these threads will receive the same inserted message at the same time , It can be understood as receiving messages by broadcasting .
This is mainly based on Redis Stream The following commands in correspond to Jedis Methods :
- xadd: Create groups
- xread: Reading data
- xgroup: Create groups
- xreadgroup: Read group messages
They are mainly used for reading block attribute , take block Property is set to 0 It means that it will be blocked until a new message is received , Then I put this step into a poll , It implements blocking. After receiving the message, it enters the next blocking , Thus, the monitoring effect is realized .
Implementation code
This time demo The code is simple , Are put into a class , And as long as there is redis You can run it directly after modifying the configuration in the code , You don't need to create... Manually stream Or group operation .
pom.xml file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>vip.huhailong</groupId>
<artifactId>JRedisMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jedis.version>4.2.3</jedis.version>
</properties>
<!-- jedis dependency -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
Implementation code
package jredismq.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/** * Use jedis Implement monitoring stream news */
public class JedisStreamMQTest {
private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
public static void main(String[] args) {
// The following contents shall be modified according to their own conditions
String host = "192.168.1.110";
int port = 6379;
int timeout = 1000;
String password = "huhailong";
int database = 0;
String streamKeyName = "streamtest";
String groupName = "testgroup";
String[]consumerNames = {
"huhailong", "xiaohu"};
String listenerType = "DEFAULT"; //GROUP or DEFAULT
// establish redis Connection pool instance
JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
JedisStreamMQTest test = new JedisStreamMQTest();
test.createGroup(pool,streamKeyName,groupName); // Create groups
if("GROUP".equals(listenerType)){
test.listenerByGroup(pool,streamKeyName,groupName,consumerNames); // Use groups and consumers to listen
}else{
test.listenerDefault(pool,streamKeyName);
}
new Thread(()->{
// Threads 3: Used to write stream data
Jedis jedis = pool.getResource();
while(true) {
try {
Thread.sleep(500L);
Map<String,String> map = new HashMap<>();
map.put("currentTime", LocalDateTime.now().toString());
jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}).start();
}
/** * Use groups and consumers to listen , This listening ensures that messages are not consumed repeatedly , Because each group and each user will consume messages only once * @param keyName stream name * @param groupName The name of the group * @param consumerNames Consumer name set */
private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
// The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem
IntStream.range(0,2).forEach(i->{
Jedis jedis = pool.getResource(); // establish jedis example
new Thread(()->{
while(true){
try{
Thread.sleep(500L);
// Below xreadGroup Method equivalent to redis Medium xreadgroup command , take block The blocking time is set to 0 Indicates that the message is always blocked until it is received , And then up there StreamEntryID Set to receive the latest value
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
// jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); // Confirmation message
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID()); // removal message
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
/** * Do not use the concept of groups and consumers to read , Multiple threads will consume data repeatedly * @param keyName stream name */
private void listenerDefault(JedisPool pool, String keyName){
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
// The following is a simple demonstration of not using a thread pool , Directly create two threads to illustrate the problem
IntStream.range(0,2).forEach(i->{
new Thread(()->{
Jedis jedis = pool.getResource(); // establish jedis example
while(true){
try{
Thread.sleep(500L);
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
} catch (Exception e){
logger.error(e.getMessage());
}
}
}).start();
});
}
private void createGroup(JedisPool pool, String keyName, String groupName){
Jedis jedis = pool.getResource();
try{
//StreamEntryID Means to create a group and receive new messages , Here you can set it according to your own needs ,0 Indicates that all historical messages are read , hinder boolean Value indicates if stream Does not exist create stream
jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
} catch (Exception e){
// The reason why the exception is caught here is that the group may already exist when it is created
logger.error(e.getMessage());
}
}
}
Ben demo The code is relatively simple , You can modify and encapsulate according to your own needs . I am also continuing to explore the use of this method to package and improve into a complete project , Does not rely on third-party frameworks such as Spring Project , So you can use it flexibly , If you find it useful, please give me some praise !
边栏推荐
- UWB ultra high precision positioning system architecture
- Setting pseudo static under fastadmin Apache
- 【ARM】在NUC977上搭建基于boa的嵌入式web服务器
- 定位设置水平,垂直居中(多种方法)
- 【上采样方式-OpenCV插值】
- [geek] product manager training camp
- Why does the mobile IM based on TCP still need to keep the heartbeat alive?
- data = self._ data_ queue. get(timeout=timeout)
- cartographer_optimization_problem_2d
- Schematic diagram of UWB ultra high precision positioning system
猜你喜欢

Leetcode114. 二叉树展开为链表

zencart新建的URL怎么重写伪静态

LeetCode_二叉搜索树_简单_108.将有序数组转换为二叉搜索树

Learn from small samples and run to the sea of stars

localStorage浏览器本地储存,解决游客不登录的情况下限制提交表单次数。

Introduction to GUI programming to game practice (I)

Decipher the AI black technology behind sports: figure skating action recognition, multi-mode video classification and wonderful clip editing

Introduction to alluxio

Fedora alicloud source

Schematic diagram of UWB ultra high precision positioning system
随机推荐
vscode config
Excellent learning ability is your only sustainable competitive advantage
The wechat team disclosed that the wechat interface is stuck with a super bug "15..." The context of
【红队】要想加入红队,需要做好哪些准备?
Codeforces Round #800 (Div. 2)
How does P2P technology reduce the bandwidth of live video by 75%?
Schematic diagram of UWB ultra high precision positioning system
Practical cases | getting started and mastering tkinter+pyinstaller
国务院发文,完善身份认证、电子印章等应用,加强数字政府建设
两步处理字符串正则匹配得到JSON列表
[latex] error type summary (hold the change)
Codeforces Round #802 (Div. 2)(A-D)
二次bootloader关于boot28.asm应用的注意事项,28035的
PHP one sentence Trojan horse
Leetcode513.找出树的左下角的值
Official image acceleration
ECCV 2020 double champion team, take you to conquer target detection on the 7th
Computer Vision Tools Chain
Recursively traverse directory structure and tree presentation
Introduction to GUI programming to game practice (I)