当前位置:网站首页>Practical dry goods: deploy mini version message queue based on redis6.0
Practical dry goods: deploy mini version message queue based on redis6.0
2022-07-04 11:23:00 【Java notes shrimp】
Technical research background
As the current R & D team is in the initial stage of the company , There is no mature operation and maintenance system yet , For the common maturity in the market MQ Insufficient construction and maintenance capacity , But I hope to have a lightweight messaging system for the members of the R & D team , Therefore, technical research work related to this aspect has been carried out .
After relevant technical research , The decision to choose is based on Redis Implement the message system .
Specific reasons for technical selection :
There are already related structures within the team Redis service , And have certain operation and maintenance capability , It can save technical cost
The industry has a lot about Redis Technical articles on building message systems
The overall throughput of the current system is not high , The main purpose of accessing message system is only to realize the decoupling between systems
For the convenience of readers from 0 To 1 Learn this content carefully , I will start with the link construction .
The construction of the basic environment
be based on redis6.0.6 Build a simple message queue system in version . The deployment environment :
docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6
Parameter interpretation : -d Background start -p Port mapping -name Container name
If there is no relevant local image , You can try to pull the image by setting up the following command :
docker pull redis:6.0.6
When redis After the basic environment of is configured , And then it's based on redis Some built-in basic functions develop a message queue component .
Next, I will introduce how to implement a lightweight message queue in three different technical solutions .
The message queue is implemented based on the conventional queue structure
This implementation is relatively simple , Mainly based on Redis Inside List Structure to the ground , The sender writes the message from the left side of the queue , Then the consumer reads from the right side of the queue .
package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Author linhao
* @Date created in 3:09 Afternoon 2022/2/7
*/
@Component
public class RedisListMQTemplate implements IMQTemplate {
@Resource
private IRedisService iRedisService;
@Override
public boolean send(MsgWrapper msgWrapper) {
try {
String json = JSON.toJSONString(msgWrapper.getMsgInfo());
iRedisService.lpush(msgWrapper.getTopic(),json);
return true;
}catch (Exception e){
e.printStackTrace();
}
return false;
}
}
Problem thinking
Here are a few questions to consider :
How to subscribe to the same message among multiple services
Here I suggest that you can prefix the project name of the system + Business identity to organize .
for example : The user needs to publish a message in the system Member has been upgraded To the downstream system , At this point, you can write this message to a file named :user-service:member-upgrade-list Of List Collection .
If the order system wants to access the messages of the user system , You need to in redis Of key Specified in user-service:member-upgrade-list keyword .
How to implement the message listening mechanism ?
about List The message can be obtained by polling , For example, the following case code :
/**
* Get data by polling
*
* @param msgWrapper
*/
private void pollingGet(MsgWrapper msgWrapper) {
while (true) {
String value = iRedisService.rpop(msgWrapper.getTopic());
if (!StringUtils.isEmpty(value)) {
System.out.println(value);
}
// Reduce access pressure , Sleep regularly for a period of time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
But the polling method consumes performance , So try to use Redis Blocking eject command for , For example, the following method is used to monitor the triggering behavior of messages :
/**
* Get data by blocking
*/
private void blockGet(MsgWrapper msgWrapper) {
while (true) {
List<String> values = iRedisService.brpop(msgWrapper.getTopic());
if (!CollectionUtils.isEmpty(values)) {
values.forEach(value -> {
System.out.println(value);
});
}
}
}
How to ensure reliable transmission of messages ?
When designing message queues , What we attach great importance to is the reliability guarantee of the message . When a message is sent to the consumer , If there is an exception , I hope that the message can be re sent .
For the design of this scenario, we can try to use BRPOPLPUSH This Directive , This instruction can help us in Redis When data is ejected internally, it is written to another backup queue , In this way, even if the pop-up message consumption fails , There is also a backup message in the backup queue that can be used , And the operations of ejecting and writing the backup queue are Redis The interior is encapsulated , External calls can be considered as an atomic operation .
Whether the broadcast mode can be supported ?
from List From the implementation principle of set ,Redis Pop up elements can only be returned to one client link , Therefore, the realization of broadcast effect cannot be supported .
Implement message queue based on publish and subscribe function
Redis A function called publish and subscribe is provided inside , adopt subscibe Command and publish Directives can help us implement the functions of message publishing and notification .
Use subscibe/publish The effect and List The biggest difference in the structure lies in its transmission mode :
list It is more about point-to-point transmission (P2P The way )
subscibe/publish It is possible to communicate with subscribers by means of broadcasting
publish Part of the case code :
@Override
public boolean publish(String channel, String content) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.publish(channel, content);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
subscibe Part of the code :
@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.subscribe(jedisPubSub, channel);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
The listening part can achieve this effect by opening an additional thread :
@Component
public class RedisSubscribeMQListener implements IMQListener {
@Resource
private IRedisService iRedisService;
class TestChannel extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
System.out.println("channel " + channel + " Message received :" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
// All channel messages are monitored
@Override
public void onMessageReach(MsgWrapper msgWrapper) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
}
});
thread.start();
}
}
it is to be noted that , When you call back the notification, you need to inject a JedisPubSub The object of , The interior of this object defines the processing operation after receiving the message .
Problem thinking
How to ensure the reliable transmission of messages ?
adopt subscibe/publish The message processed has no persistence feature , In the event of a network outage ,Redis Abnormal times such as downtime will lead to message loss , And there is no good mechanism to support the problem of repeated consumption of messages . Therefore, the reliability is poor .
be based on Stream Implement message queuing
Redis5.0 Released in Stream type , Also used to implement a typical message queue . It provides message persistence and primary / secondary replication functions , Any client can access data at any time , And remember the location of each client's access , It also ensures that the message is not lost . The Stream The emergence of types , Almost all the content of message queue is satisfied , Including but not limited to :
news ID The serialization generation of
Message traversal
Blocking and non blocking reads of messages
Packet consumption of messages
Processing of unfinished messages
Message queuing monitoring
About Stream Some basic introductory chapters of are not introduced here , Interested friends can read this article :
https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd
In the next section, let's go directly to Redis XStream Relevant practical links .
Encapsulate the message listening function
The first is to define a MQ Associated interface :
public interface RedisStreamListener {
/**
* Process normal messages
*/
HandlerResult handleMsg(StreamEntry streamEntry);
}
Next is the implementation of message sending based on this set of interfaces :
package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
* @Author linhao
* @Date created in 10:07 Afternoon 2022/2/9
*/
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
@Resource
private IRedisService iRedisService;
@Override
public HandlerResult handleMsg(StreamEntry streamEntry) {
Map<String, String> map = streamEntry.getFields();
String json = map.get("json");
PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
System.out.println("pending payMsg is : " + payMsg);
return SUCCESS;
}
}
Custom message annotations
package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* @Author linhao
* @Date created in 10:04 Afternoon 2022/2/9
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
String streamName() default "";
String groupName() default "";
String consumerName() default "";
}
There is a custom in the code @StreamListener Annotations , The inside of this annotation contains a @Component Annotations , Objects that use this annotation can be injected into Spring In the container .
In order to automatically assemble these initialization classes , You also need to add a configured object , The code is as follows :
package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
* @Author linhao
* @Date created in 3:25 Afternoon 2022/2/7
*/
@Configuration
public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private ApplicationContext applicationContext;
@Resource
private IRedisService iRedisService;
private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
beanMap.values().forEach(redisStreamMQListener -> {
StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
handleThread.start();
pendingHandleThread.start();
logger.info("{} load successed ", redisStreamMQListener);
});
}
class PendingMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
String startId = "0-0";
while (true) {
List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
// If the collection is not empty , Then the monitoring behavior is triggered
if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
StreamEntryID streamEntryID = streamConsumersInfo.getID();
// More than current pending Of streamId Small 1
String streamIdStr = streamEntryID.toString();
String[] items = streamIdStr.split("-");
Long timestamp = Long.valueOf(items[0]) - 1;
String beforeId = timestamp + "-" + "0";
List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
for (Map.Entry<String, List<StreamEntry>> streamInfo : result) {
List<StreamEntry> streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
try {
// Business processing
HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(handlerResult)) {
startId = streamEntryID.toString();
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
}
} catch (Exception e) {
logger.error("[PendingMsgHandlerThread] e is ", e);
}
}
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class CoreMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
while (true) {
List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) {
List<StreamEntry> streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
// Business processing
try {
HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(result)) {
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
}
} catch (Exception e) {
logger.error("[CoreMsgHandlerThread] e is ", e);
}
}
}
}
}
}
}
The principle is Spring After the container is started , monitor Spring From the inside of the container ApplicationReadyEvent event , Monitor the event , And start two background threads for processing redis Inside stream data .
Encapsulate relevant message publishing functions
The sending part of the message is relatively simple , Directly through redis Go to stream Write data in it
package org.idea.mq.redis.framework.producer;
/**
* @Author linhao
* @Date created in 12:23 Afternoon 2022/2/10
*/
public interface IStreamProducer {
/**
* Appoint streamName Release the news
* @param streamName
* @param json
*/
void sendMsg(String streamName, String json);
}
The message transmission format is json Write as string to redis Inside stream among .
package org.idea.mq.redis.framework.producer;
import org.idea.mq.redis.framework.redis.IRedisService;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @Author linhao
* @Date created in 12:19 Afternoon 2022/2/10
*/
public class StreamProducer implements IStreamProducer{
@Resource
private IRedisService iRedisService;
@Override
public void sendMsg(String streamName,String json){
Map<String,String> map = new HashMap<>();
map.put("json",json);
iRedisService.xAdd(streamName,map);
}
}
Be careful , When writing to the bottom layer , I'm using Redis Automatically generated internally ID Serial number , The code is as follows :
@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
For convenience, it is used as a SpringBoot Of starter Components are used by external team members , We can encapsulate it into a starter Components :
Testing of components
Point to point sending test
Establish two sets of microservice projects ,user-service
and order-service
, among user-service
Deploy two service nodes , Of the same genus user-service-group
.order-service
Also deploy two service nodes , Of the same genus order-service-group
.
Finally, the two micro service clusters publish the messages subscribed by each other , Check whether it can be accepted normally , And only one node in the same group receives messages at a time .
Broadcast transmission test
Use the previously built user-service modular , Deploy four nodes , Subscribe to the same stream queue , But put it groupName Set to different properties , Finally release the news , Check that all four nodes can receive normally .
The test template has been established in the existing project for specific details , Interested friends can read mq-redis-test Part of the module .
Problem thinking
Why the same StreamName Dual thread consumption is required ?
A thread is used to accept Stream Internal normal data , If the business is processed normally, it will be returned as ack The signal , Confirm that the message has been successfully consumed . If an exception occurs during processing , No reverse return ACK The signal , here Redis The message will be put into Pending In line , The second thread is dedicated to Pending Data inside the queue . If in the Pending The second consumption of the status message still fails , Regular polling will be carried out .
Whether delay retry is supported
In fact, the current design has always had shortcomings , For example, when the message consumption is abnormal, it will enter polling , In severe cases, it may lead to an endless cycle of message consumption , And it keeps clogging up . Not yet implemented, similar to RocketMQ The kind of interval 1,3,5... Minutes of timed delivery consumption failure messages are all functional . Interested partners can make simple modifications based on existing code .
This article complete code case address
https://gitee.com/IdeaHome_admin/mq-framework
recommend :
Main stream Java Advanced technology ( Learning material sharing )
PS: Because the official account platform changed the push rules. , If you don't want to miss the content , Remember to click after reading “ Looking at ”, Add one “ Star standard ”, In this way, each new article push will appear in your subscription list for the first time . spot “ Looking at ” Support us !
边栏推荐
- VPS installation virtualmin panel
- Elevator dispatching (pairing project) ①
- OSI model notes
- QQ one click cookie acquisition
- Notes on writing test points in mind mapping
- Digital simulation beauty match preparation -matlab basic operation No. 6
- Introduction to canoe automatic test system
- Summary of collection: (to be updated)
- iptables导致Heartbeat脑裂
- Login operation (for user name and password)
猜你喜欢
What if the book written is too popular? Author of "deep reinforcement learning" at Peking University: then open the download
Replace() function
Canoe - the third simulation project - bus simulation - 3-1 project implementation
JMeter Foundation
QQ group collection
Reptile learning 4 winter vacation series (3)
How to create a new virtual machine
Reptile learning winter vacation series (2)
Canoe - the second simulation project -xvihicle1 bus database design (operation)
Canoe - the second simulation engineering - xvehicle - 2panel design (principle, idea)
随机推荐
Climb Phoenix Mountain on December 19, 2021
QQ get group settings
Canoe: what is vtsystem
Test question bank management system - database design [easy to understand]
Canoe-the second simulation project-xvehicle-1 bus database design (idea)
VPS installation virtualmin panel
Object. Assign () & JS (= >) arrow function & foreach () function
Summary of collection: (to be updated)
Heartbeat error attempted replay attack
Locust installation
(August 10, 2021) web crawler learning - Chinese University ranking directed crawler
Aike AI frontier promotion (2.14)
XMIND installation
Xiaobing · beauty appraisal
os. Path built-in module
2020 Summary - Magic year, magic me
Solaris 10网络服务
Dos and path
Post man JSON script version conversion
本地Mysql忘记密码的修改方法(windows)