当前位置:网站首页>Basic use of ActiveMQ in Message Oriented Middleware
Basic use of ActiveMQ in Message Oriented Middleware
2022-06-25 07:49:00 【Jack_ Chen】
Message middleware ActiveMQ Basic use of
ActiveMQ Implementation steps
establish ConnectionFactory Factory object , User name is required 、 password 、 Connection address
adopt ConnectionFactory Object to create a Connection Connect
adopt Connection objects creating Session conversation
adopt Session objects creating Destination object . In point-to-point mode ,Destination It's called a queue (Queue), stay Pub/Sub In the pattern ,Destination It's called the theme (Topic)
adopt Session Object to create sending and receiving objects for messages
Send a message
close resource
Point to point mode
Overview of point-to-point mode
Point to point mode involves : Message queue (Queue) sender (Sender) The receiver (Receiver)
Each message is sent to a specific queue , Receiver gets message from queue . Queue holds messages , Until they are consumed or over time .
For example, the manufacturer sent 10 Message to the activeMQ The server , At this time, there are many Consumer , Then these consumers will share these 10 Bar message , A message can only be received by one consumer .
Point to point features
1. Only one consumer per message (Consumer)( Once consumed , The message is no longer in the message queue )
2. There is no time dependency between sender and receiver , That is, when the sender sends a message , Whether the receiver is running or not , It does not affect the message being sent to the queue
3. The receiver needs to reply to the queue after receiving the message successfully
Introduce dependencies
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
producer
public class Producer {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String QUEUE = "myQueue";
public static void main(String[] args) throws JMSException {
// establish ActiveMQConnectionFactory Conversational factory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
// Create connection
Connection connection = activeMQConnectionFactory.createConnection();
// Start the connection
connection.start();
//Session: A thread that sends or receives messages
//false: Do not open the message ,true: Means to commit as a transaction Set message reliability
//Session.AUTO_ACKNOWLEDGE : Indicates that the message is automatically signed
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Create a queue
Queue queue = session.createQueue(QUEUE);
//MessageProducer: Message producer
MessageProducer producer = session.createProducer(queue);
// Set not persistent
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Send a message
send(producer, session);
System.out.println(" Send successfully !");
session.close();
connection.close();
}
static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println(" I am the news. " + i);
// Create a text message
TextMessage textMessage = session.createTextMessage(" I am the news. " + i);
// Send message through message producer
producer.send(textMessage);
}
}
}
consumer
public class receiver {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String QUEUE = "myQueue";
public static void main(String[] args) throws JMSException {
// establish ActiveMQConnectionFactory Conversational factory Create connection
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// Start the connection
connection.start();
// Do not open the news to enlighten things , Automatic sign in
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue
Queue queue = session.createQueue(QUEUE);
//MessageConsumer: Message consumer
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println(" Received the news :" + textMessage.getText());
} else {
break;
}
}
session.close();
connection.close();
}
}
Publish subscribe mode
Overview of publish subscribe mode
The publish subscribe pattern involves : The theme (Topic) Publisher (Publisher) subscriber (Subscriber)
The publisher sends the message to Topic, The system delivers these messages to multiple subscribers
characteristic
1. Multiple consumers per message
2. There is a temporal dependency between publishers and subscribers . For a subject (Topic) Subscribers , It must create a subscriber after , To consume a publisher's message , And for consumption news , Subscriber must remain running .
3. In order to alleviate such strict time correlation ,JMS Allow subscribers to create a durable subscription . such , Even if the subscriber is not activated ( function ), It can also receive messages from publishers .
4. If you want to send a message without any processing 、 Or by a messenger 、 Or it can be handled by multiple consumers , Then we can use Pub/Sub Model
Introduce dependencies
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
producer
public class Producer {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "myTopic";
public static void main(String[] args) throws JMSException {
// establish ActiveMQConnectionFactory Conversational factory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
// Create connection
Connection connection = activeMQConnectionFactory.createConnection();
// Start the connection
connection.start();
//Session: A thread that sends or receives messages
//false: Do not open the message ,true: Means to commit as a transaction Set message reliability
//Session.AUTO_ACKNOWLEDGE : Indicates that the message is automatically signed
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Create a queue
//Queue queue = session.createQueue(QUEUE);
Topic topic = session.createTopic(TOPIC);
//MessageProducer: Message producer
MessageProducer producer = session.createProducer(topic);
// Set not persistent
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Send a message
send(producer, session);
System.out.println(" Send successfully !");
session.close();
connection.close();
}
static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println(" I am the news. " + i);
// Create a text message
TextMessage textMessage = session.createTextMessage(" I am the news. " + i);
// Send message through message producer
producer.send(textMessage);
}
}
}
consumer
public class receiver {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "myTopic";
public static void main(String[] args) throws JMSException {
// establish ActiveMQConnectionFactory Conversational factory Create connection
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// Start the connection
connection.start();
// Do not open the news to enlighten things , Automatic sign in
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create a queue
//Queue queue = session.createQueue(QUEUE);
Topic topic = session.createTopic(TOPIC);
//MessageConsumer: Message consumer
MessageConsumer consumer = session.createConsumer(topic);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println(" Received the news :" + textMessage.getText());
// Manual response
textMessage.acknowledge();
} else {
break;
}
}
session.close();
connection.close();
}
}
Message reliability mechanism
ActiveMQ Message signing mechanism : The sign that the client successfully receives a message is that a message has been signed , Answer successfully .
There are two types of message signing : With transactions session With no transaction session
With transactions session
If session With transactions , And the transaction is successfully committed , The message is automatically signed in . If the transaction rolls back , Then the message will be sent again .
Generator
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Create a text message
TextMessage textMessage = session.createTextMessage(" I am the news. " + i);
// Send message through message producer
producer.send(textMessage);
// Commit message
session.commit();
consumer
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println(" Received the news :" + textMessage.getText());
session.commit();
}
Without transaction session
Without transaction session Sign for , Depending on session Configuration of .
1.Session.AUTO_ACKNOWLEDGE
Session.AUTO_ACKNOWLEDGE Message auto sign in
2.Session.CLIENT_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE Client calls acknowledge() Methods sign in manually
Generator
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
// Create a text message
TextMessage textMessage = session.createTextMessage(" I am the news. " + i);
// Send message through message producer
producer.send(textMessage);
consumer
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
if (textMessage != null) {
System.out.println(" Received the news :" + textMessage.getText());
// Manual response
textMessage.acknowledge();
}
3.Session.DUPS_OK_ACKNOWLEDGE
Session.DUPS_OK_ACKNOWLEDGE You don't have to sign for it , Messages may be sent repeatedly .
At the second retransmission , Only after the message has been confirmed , I think it has been successfully consumed .
ActiveMQ And Spring Integrate
Introduce dependencies
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<!--activemq-client Introduced in jms yes 1.x,5.xSpring need 2.x Version of jms-->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
</dependencies>
xml To configure
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="cn.ybzy"></context:component-scan>
<!-- produce Connection Of ConnectionFactory, By the corresponding JMS The service provider provides -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!-- Spring Used to manage ConnectionFactory Of ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- The goal is ConnectionFactory Corresponding to the real can produce JMS Connection Of ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring Provided JMS Tool class , It can send messages 、 Reception, etc -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- This connectionFactory That's what we define Spring The one provided ConnectionFactory object -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- Point to point mode -->
<bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue"/>
</bean>
<!-- Release 、 A subscription model -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- Monitor class -->
<bean id="myMessageListener" class="cn.ybzy.MyMessageListener"></bean>
<!-- Message listening container , It will be accompanied by spring Start of -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="textDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
producer
@Component
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination textDestination;
public void sendTextMessage(final String text){
jmsTemplate.send(textDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
Consumer monitoring
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println(" Consumer receives message :"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
test
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring_jms.xml")
public class TestProducer {
@Autowired
private Producer producer;
@Test
public void testSend(){
for (int i = 0; i < 100; i++) {
producer.sendTextMessage(" news " + i);
}
}
}
ActiveMQ And SpringBoot Integrate
Introduce dependencies
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.8.RELEASE</version>
</parent>
<!-- springboot Integrate activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- Integrated sending mail -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
producer
##activemq Connection information
spring:
activemq:
broker-url: tcp://localhost:61616
in-memory: true
pool:
enabled: false
## queue
messages:
queue: mail_queue
Send a message
@Service("registerMailboxProducer")
public class RegisterMailboxProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate ;
public void send(Destination destination,String json){
jmsMessagingTemplate.convertAndSend(destination, json);
}
}
Send message content encapsulation
@Value("${messages.queue}")
private String MESSAGES_QUEUE;
@Override
public void regist(User user) {
//TODO Registration logic , Send email after successful registration
// Specify the queue
Destination activeMQQueue = new ActiveMQQueue(MESSAGES_QUEUE);
// The message content
String mailMessage = mailMessage(user.getEmail(), user.getUserName());
log.info("###regist() Register to send email content :{}", mailMessage);
// Send a message
registerMailboxProducer.send(activeMQQueue, mailMessage);
}
private String mailMessage(String email, String userName) {
JSONObject root = new JSONObject();
JSONObject header = new JSONObject();
header.put("interfaceType", "sms_mail");
JSONObject content = new JSONObject();
content.put("mail", email);
content.put("userName", userName);
root.put("header", header);
root.put("content", content);
return root.toJSONString();
}
consumer
spring:
application:
name: message
activemq:
broker-url: tcp://localhost:61616
in-memory: true
pool:
enabled: false
mail:
host: smtp.163.com
username: XXX@163.com
password: Authorization code
Message distribution interface
public interface MessageAdapter {
/** * Message delivery * @param jsonObject */
public void distribute(JSONObject jsonObject);
}
Message monitoring , Distribute messages
@Slf4j
@Component
public class ConsumerDistribute {
@Autowired
private SMSMailboxService smsMailboxService;
@JmsListener(destination = "mail_queue")
public void distribute(String json) {
log.info("### Received a message , The message content json:{}", json);
if (StringUtils.isEmpty(json)) {
return;
}
JSONObject jsonObject = JSON.parseObject(json);
JSONObject header = jsonObject.getJSONObject("header");
String interfaceType = header.getString("interfaceType");
MessageAdapter messageAdapter = null;
switch (interfaceType) {
// Send E-mail
case "sms_mail":
messageAdapter=smsMailboxService;
break;
default:
break;
}
JSONObject content=jsonObject.getJSONObject("content");
messageAdapter.distribute(content);
}
}
Send mail to achieve
@Slf4j
@Service
public class SMSMailboxService implements MessageAdapter {
@Autowired
private JavaMailSender mailSender;
@Value("${spring.mail.username}")
private String EMAIL_NAME;
@Override
public void distribute(JSONObject jsonObject) {
String mail=jsonObject.getString("mail");
String userName=jsonObject.getString("userName");
log.info("### Consumers receive messages ... mail:{},userName:{}",mail,userName);
// Send E-mail
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(EMAIL_NAME);
message.setTo(mail);
message.setSubject(" Congratulations on your successful registration .");
message.setText(" Congratulations "+userName+", Become XXX New users !");
log.info("### Send SMS email mail:{}", mail);
mailSender.send(message);
}
}
test
http://127.0.0.1:8762/regist
{
"userName":" The small white ",
"email":"[email protected]"
}

边栏推荐
- 【日常训练】207. 课程表
- Access to foreign lead domain name mailbox
- Sichuan earth microelectronics ca-is1300 isolated operational amplifier for current detection is on the market
- Ca-is1200u current detection isolation amplifier has been delivered in batch
- El input to add words to the tail
- Bicubic difference
- 國外LEAD域名郵箱獲取途徑
- OpenMP入门
- Getting started with OpenMP
- Leetcode daily question - 515 Find the maximum value in each tree row
猜你喜欢

FairMOT yolov5s转onnx

How to select lead-free and lead-free tin spraying for PCB? 2021-11-16

realsense d455 semantic_slam实现语义八叉树建图

VSCode很好,但我以后不会再用了

神经网络与深度学习-3- 机器学习简单示例-PyTorch

GUI pull-down menu of unity3d evil door implementation dropdown design has no duplicate items

How to use ad wiring for PCB design?

Modular programming of LCD1602 LCD controlled by single chip microcomputer

The method of judging whether triode can amplify AC signal

ELK + filebeat日志解析、日志入库优化 、logstash过滤器配置属性
随机推荐
1742. 盒子中小球的最大数量
Access to foreign lead domain name mailbox
Pcb|about FPC reinforcement type
无“米”,也能煮“饭”利用“点云智绘”反演机载LiDAR林下缺失地面点攻略
What are the benefits of reserving process edges for PCB production? 2021-10-25
Hisilicon 3559 sample parsing: Vio
C#控件刷新
Elk + filebeat log parsing, log warehousing optimization, logstash filter configuration attribute
el-input实现尾部加字
27. 移除元素
FairMOT yolov5s转onnx
Full range of isolator chips with integrated isolated power supply
Mysql面试-执行sql响应比较慢,排查思路。
npm install 报错 : gyp ERR! configure error
國外LEAD域名郵箱獲取途徑
Chuantu microelectronics high speed and high performance rs-485/422 transceiver series
机器学习笔记 - 时间序列的线性回归
Chuantu microelectronics 𞓜 subminiature package isolated half duplex 485 transceiver
CPDA|数据分析师成长之路如何起步?
剑指 Offer II 027. 回文链表