当前位置:网站首页>ActiveMQ -- JDBC code of persistent mechanism
ActiveMQ -- JDBC code of persistent mechanism
2022-07-25 09:17:00 【Why don't you laugh】
Coding test
Be sure to turn on persistence !!!
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);
queue
producer
public class JmsProduceJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//1. According to the given url Create connection factory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
// 2. Connect through the factory connection And start
Connection connection = activeMQConnectionFactory.createConnection();
// 3. start-up
connection.start();
// 4. Create a session session
// Two parameters , The first thing , Second sign in
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. Create destination , queue 、 The theme , Here we use queues
Queue queue = session.createQueue(QUEUE_NAME);
// 6. Create the producer of the message
MessageProducer messageProducer = session.createProducer(queue);
/** * Persistence must be set */
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7. adopt MessageProducer production 3 Messages are sent to the message queue
for (int i = 1; i <= 6; i++) {
//8. Create a message
TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
//9. Send a message
messageProducer.send(textMessage);
}
// 10. close resource
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** Message sent to MQ complete **** ");
}
}
production 6 Bar message :

In the database ACTIVEMQ_MSGS In the table , Will generate 6 Data , It is the news of the previous production

consumer
public class JmsConsumerJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
// Create connection factory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
// Create connection connection
Connection connection = activeMQConnectionFactory.createConnection();
// Open the connection
connection.start();
// Create a session session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue , Consistent with the producer
Queue queue = session.createQueue(QUEUE_NAME);
// Create message consumer
MessageConsumer messageConsumer = session.createConsumer(queue);
/** * Method 2: By means of listeners */
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("**** Consumer receives message ****:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); // You must add a line of code , Otherwise, the program will run directly down and end
messageConsumer.close();
session.close();
connection.close();
System.out.println("**** Consumer message complete ****");
}
}
Start consumer , Messages that have been produced will be deleted ,mq Console and database data will be consumed


Queue consumption summary :
- When DeliveryMode Set to NON_PERSISTENCE when , Messages are stored in memory
- When DeliveryMode Set to PERSISTENCE when , The message is kept in broker In the corresponding file or database
Once the messages in the queue are consumer Consumption starts from Broker Delete in
The theme
You must start the consumer subscription theme first
consumer
public class JmsConsumerTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
/** * Persistent topic message subscription , Similar to wechat official account subscription * You need to start the consumer first , After subscribing to the topic , Follow up production theme messages , consumer ( subscriber ) You will receive a message * consumer ( subscriber ) After subscribing to a topic , Whether online or offline , As long as you keep the normal subscription status , Messages produced during this period will be received . Offline users will receive the previous message after being online again */
System.out.println("jdbc-1"); // Simulate subscriber
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("jdbc-1"); // Set up clientId, Indicates the subscriber
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println(" Persistence received topic news :" + textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
Start consumer :


view the database ,ACTIVEMQ_ACKS Add a new record to the table , Information for the current subscriber

producer
public class JmsProduceTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// connection You must set a persistent theme before starting
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** Persistent messages are sent to MQ complete **** ");
}
}
Start producer :


view the database :ACTIVEMQ_MSGS Consumption data will be added ,ACTIVEMQ_ACKS Of LAST_ACKED_ID It will be updated as the last consumption message ID
ACTIVEMQ_MSGS Inside topic Messages will not be deleted immediately after consumption , and queue Automatically delete after consumption


A small summary
queue
Production news without consumption , Messages will exist
activemq_msgsIn the table , As long as any consumer consumes these messages , These messages will be deleted immediatelytopic
It is generally after starting the consumer subscription , Then produce news through the producer , Then the message will also exist
activemq_msgsIn the table ,activemq_acksThe table stores consumer subscription informationDevelopment considerations
1.mysql Drive pack ( Or other databases ) And the corresponding database connection pool jar The bag needs to be put in activemq In the catalog lib in
2. The initial configuration is completed , After the database generates the table ,activemq.xml Middle configuration
createTablesOnStartup=false3.BeanFactory not initialized or already closed abnormal
Put the machine name of the operating system with "_" Remove the symbol , Restart the operating system
边栏推荐
- flink sql怎么持久化?
- activemq--可持久化机制之JDBC的journal
- Six storage types in C language: Auto register static extern const volatile
- JS small game source code magic tower breakthrough Download
- This ten-year content industry infrastructure company is actually an invisible Web3 pioneer
- mysql中的数据结果排名
- [stl]stack & queue simulation implementation
- 神经网络学习(1)前言介绍
- The annualization of financial products is 4%. How much profit can you get from buying 10000 yuan a month?
- LabVIEW experiment - temperature detection system (experimental learning version)
猜你喜欢
![[STL]list模拟实现](/img/92/2a78382700c1ebf299c6505d962c9c.png)
[STL]list模拟实现
![[stl]stack & queue simulation implementation](/img/92/c040c0e937e2666ee179189c60a3f2.png)
[stl]stack & queue simulation implementation

Common tool classes under JUC package

What is steel grating?
![[SCADA case] myscada helps VIB company realize the modernization and upgrading of production line](/img/67/b8c397d78a675014b5e08ceefc88dc.png)
[SCADA case] myscada helps VIB company realize the modernization and upgrading of production line

Oracle10g单实例数据库升级到哪个版本好,求建议
![[C language] dynamic memory management, flexible array](/img/da/b9455885df0cb6646908e3655d62c5.png)
[C language] dynamic memory management, flexible array

Arrange the array into the smallest number

Comparison between symmetric encryption and asymmetric encryption

Composition of the interview must ask items
随机推荐
js弹出式城市筛选组件匹配手机移动端
API parsing of JDBC
Leetcode · 83 biweekly race · 6129. Number of all 0 subarrays · mathematics
Robot jumping problem
[arm] Xintang nuc977 transplants wk2124 drive
Django4.0 + Web + MySQL5.7 实现简单登录操作
Redis/Mysql知识概述
Silicon Valley class lesson 11 - official account news and wechat authorization
SQL injection
How to avoid duplicate data when the database is high and distributed
Silicon Valley classroom lesson 15 - Tencent cloud deployment
28. Slot
Oracle10g单实例数据库升级到哪个版本好,求建议
Additional: in the lower division / county (data sheet)
机器人跳跃问题
Illustration leetcode - 1184. Distance between bus stops (difficulty: simple)
[C language] dynamic memory management, flexible array
JS pop-up City filtering component matches mobile terminal
[common tools] obtain system status information based on psutil and gputil
Uniapp intercepts route jumps through addinterceptor to control whether the page needs to log in