当前位置:网站首页>Activemq消息组件发布订阅ReDelivery消息重新投递
Activemq消息组件发布订阅ReDelivery消息重新投递
2022-06-29 08:22:00 【蜗牛的小牛】
ActiveMQ师出名门是Apache 出品,最流行的,能力强劲的开源消息总线。是完全基于JMS规范的消息组件,并且容易集成主流spring框架,spring boot等,本文以spring boot集成实现生产者发送消息,消费者消费消息,以及消息重新投递。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lab-32-activemq-demo</artifactId>
<dependencies>
<!-- 实现对 ActiveMQ 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
消息生产者ActiveMqProducer
package com.story.storyadmin.mq.active;
import com.alibaba.fastjson.JSONObject;
import com.story.storyadmin.mq.vo.MessageVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
/** * @Description TODO * @Author huangd * @Date 2020-08-11 **/
@Service
public class ActiveMqProducer {
@Autowired
private JmsMessagingTemplate jmsTemplate;
public void syncSend(Integer id) {
MessageVo messageVo = new MessageVo();
messageVo.setMessage("hello world");
messageVo.setMsgId(123456L);
for (int i=0; i < 5; i++) {
jmsTemplate.convertAndSend("first_queue", JSONObject.toJSONString(messageVo));
}
}
}
消息消费者Customer
package com.story.storyadmin.mq.active;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
/** * @Description TODO * @Author huangd * @Date 2020-08-11 **/
@Service
public class Customer {
@JmsListener(destination="first_queue", concurrency = "2")
public void customer(String message) {
System.out.println("thread=" + Thread.currentThread().getId() + " receive msg is: " + message);
}
}
package com.story.storyadmin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@SpringBootApplication
public class StoryAdminApplication extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(this.getClass());
}
public static void main(String[] args) {
SpringApplication.run(StoryAdminApplication.class, args);
}
}
spring:
activemq:
broker-url: tcp://你的虚拟机地址:61616
user: admin
password: admin
单元测试类
package com.story.storyadmin;
import com.story.storyadmin.mq.active.ActiveMqProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StoryAdminApplication.class)
public class StoryAdminApplicationTests {
@Autowired
private ActiveMqProducer activeMqProducer;
@Test
public void contextLoads() throws IOException {
activeMqProducer.syncSend(101);
System.in.read();
}
}

看到有两个线程在消费消息,这是因为在配置监听类加了@JmsListener(destination=“first_queue”, concurrency = “2”)
以上是正常消费,如果在消费时出现了异常,activemq默认是会重新投递消费的。
官网指明以下情况会导致消息重发:
- A transacted session is used and rollback() is called.
- A transacted session is closed before commit() is called.
- A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
- A client connection times out (perhaps the code being executed takes longer than the configured time-out period).
官网地址:Message Redelivery and DLQ Handling
对Customer做以下修改,模拟处理异常
@Service
public class Customer {
@JmsListener(destination="first_queue", concurrency = "2")
public void customer(String message) {
throw new RuntimeException("customer error");
// System.out.println("thread=" + Thread.currentThread().getId() + " receive msg is: " + message);
}
}
重新执行可以看到会重试,因为spring-boot自动装配的原因,(ActiveMQAutoConfiguration入口类),默认会重试6次,每隔1秒重试一次。
如果不想使用spring boot自动装配的重试策略,可以自定义Bean,并通过IOC注入到spring容器中,如下:
ActiveMqConfig.java
package com.story.storyadmin.mq.active;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.annotation.Resource;
/** * @Description TODO * @Author huangd * @Date 2020-08-13 **/
@Configuration
public class ActiveMqConfig {
@Resource
private ActiveMQProperties activeMQProperties;
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
// 重试两次
redeliveryPolicy.setMaximumRedeliveries(2);
// 每隔2秒重试一次
redeliveryPolicy.setInitialRedeliveryDelay(2000L);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(RedeliveryPolicy redeliveryPolicy) {
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(activeMQProperties.getUser(), activeMQProperties.getPassword(), activeMQProperties.getBrokerUrl());
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
jmsListenerContainerFactory.setConcurrency("2");
jmsListenerContainerFactory.setConnectionFactory(activeMQConnectionFactory);
jmsListenerContainerFactory.setSessionAcknowledgeMode(4);
return jmsListenerContainerFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory);
jmsTemplate.setSessionAcknowledgeMode(4);
return jmsTemplate;
}
}
ActiveMqProducer.java
package com.story.storyadmin.mq.active;
import com.alibaba.fastjson.JSONObject;
import com.story.storyadmin.mq.vo.MessageVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
/** * @Description TODO * @Author huangd * @Date 2020-08-11 **/
@Service
public class ActiveMqProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void syncSend(Integer id) {
MessageVo messageVo = new MessageVo();
messageVo.setMessage("hello world " + id);
messageVo.setMsgId(123456L);
for (int i=0; i < 5; i++) {
jmsTemplate.convertAndSend("first_queue", JSONObject.toJSONString(messageVo));
}
}
}
Customer.java
package com.story.storyadmin.mq.active;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
/** * @Description TODO * @Author huangd * @Date 2020-08-11 **/
@Service
public class Customer {
@JmsListener(destination="first_queue", containerFactory = "defaultJmsListenerContainerFactory")
public void customer(final TextMessage text, Session session) throws JMSException {
try{
String message = text.getText();
System.out.println("thread=" + Thread.currentThread().getId() + " receive msg is: " + message);
throw new RuntimeException("customer error");
// text.acknowledge();
}catch (Exception e) {
session.recover();
}
}
}
Application启动类加上@EnableConfigurationProperties({ ActiveMQProperties.class})让ActiveMQProperties类生效。
如果重试到最大次数还没正常消费,则会进入死信队列ActiveMQ.DLQ
边栏推荐
- La finale de la zone de compétition Hefei de la sixième saison 2022 a été couronnée de succès.
- 实战回忆录:从Webshell开始突破边界
- 2022年7月产品经理认证招生简章(NPDP)
- Déclaration de la variable Typescript - - assertion de type
- 对比HomeKit、米家,智汀家庭云版有哪些场景化的体验
- MQTT第二话 -- emqx高可用集群实现
- First electric shock, so you are such a dragon lizard community | dragon lizard developer said that issue 8
- uni-app获取当前页面路由url
- Is it really safe to open a stock account online? Find the answer
- P4769-[noi2018] bubble sort [combinatorics, tree array]
猜你喜欢

The @dynamicmemberlookup and callasfunction features in swift implement the object transparent proxy function

io流的总结

机器人代码生成器之Robcogen使用教程

Baodawei of the people's Chain: break down barriers and establish a global data governance sharing and application platform

hostname -f与uname -n的返回值可能不同

“国防七校”之一西工大遭境外网络攻击
Development tips - Image Resource Management

Differences between x86 and x64
开发小技巧-图片资源管理

The sixth season of 2022 perfect children's model Qingyuan competition area audition came to a successful conclusion
随机推荐
The difference and usage of JS for in loop and for of loop
hostname -f与uname -n的返回值可能不同
工厂模式和策略模式的区别
闭关修炼(二十)如何做好单元测试
2022第六季完美童模 合肥赛区 决赛圆满落幕
图解SOC中的Timer(一):系统里有哪些Timer?
观察者模式怎么实现
Some behaviors of Apple developers that are prone to investigation
Official reply on issues related to the change of children's names after parents' divorce
MySQL的分库分表策略及应用场景
单例模式的理解
CDGA|交通行业做好数字化转型的核心是什么?
(III) encoder self attention mask
Memoirs of actual combat: breaking the border from webshell
Is it safe for the top ten securities companies to open accounts? Is it reliable?
sed 替换值为变量
[microservices openfeign] timeout of openfeign
Huawei equipment is configured with small network WLAN basic services
Résumé des différentes séries (harmoniques, géométriques)
2022年7月产品经理认证招生简章(NPDP)