Link to the original text :https://blog.csdn.net/kavito/article/details/91403659
Introducing RabbitMQ Before , Let's first take a look at the scene of an e-commerce project :
-
The original data of the goods are stored in the database , Addition, deletion, modification and query are all completed in the database .
-
The data source of search service is index library (Elasticsearch), If the database item changes , Index database data can't be updated in time .
-
The product details have been processed statically , Static page data will not change with the update of database products .
If we change the price of the goods in the background , The search page and the product details page still show the old price , This is obviously not right . How to solve ?
We might think of doing this :
-
programme 1: Whenever the background of the goods to do the operation of adding, deleting and changing , At the same time, modify the index database data and update the static page .
-
programme 2: Search service and product page static service provide external operation interface , Backstage after the addition, deletion and modification of goods , Call interface .
Both options have a serious problem : It's code coupling , Search and product page services need to be embedded in background services , Against microservices Independent
principle .
At this time , We'll take another solution , That's message queuing !
After the addition, deletion and modification of goods and services , There's no need to manipulate index libraries and static pages , Just to MQ Send a message ( For example, it includes goods id The news of ), I don't care who receives the message . Search services and static page services listen MQ, receive messages , Then go to deal with index library and static page respectively ( According to the goods id To update the index library and product details static page ).
What is message queuing
MQ Its full name is Message Queue, That's message queuing .“ Message queue ” It's a container that holds messages during their transmission . It's typical of : producer 、 Consumer model . The producer keeps producing messages... To the message queue , Consumers are constantly getting messages from the queue . Because the production and consumption of messages are asynchronous , And only care about the sending and receiving of messages , No intrusion of business logic , In this way, the decoupling between producers and consumers is realized .
Message queuing in development usually has the following application scenarios :
1、 Tasks are processed asynchronously :
High concurrency environment , Due to late synchronization processing , Requests tend to jam , for instance , a large number of insert,update And so on at the same time MySQL, Leads directly to an infinite number of row lock table locks , You may even end up with too many requests , triggering too many connections error . By using message queues , We can process requests asynchronously , To relieve pressure on the system . The message queue notifies the message receiver of asynchronous processing of the long-time operation that does not need synchronous processing . Reduced application response time .
2、 Application decoupling :
MQ It's like an intermediary , The manufacturer passes through MQ Interact with consumers , It decouples applications .
AMQP and JMS
MQ It's a model of message communication , Concurrent implementation . Now realize MQ There are two main ways :AMQP、JMS.
The difference and connection between the two :
-
JMS It defines a unified interface , To unify message operations ;AMQP It's about Unifying the format of data interaction through protocols
-
JMS It limits the use of Java Language ;AMQP It's just an agreement , There is no way of implementation , So it's Cross lingual .
-
JMS Two message models are specified ; and AMQP The message model is much richer
common MQ product
-
ActiveMQ: be based on JMS
-
RabbitMQ: be based on AMQP agreement ,erlang Language development , Good stability
-
RocketMQ: be based on JMS, Alibaba products , It's up to Apache The foundation
-
Kafka: Distributed messaging system , High throughput
RabbitMQ Quick start
RabbitMQ By erlang Language development , be based on AMQP(Advanced Message Queue Advanced message queue protocol ) Protocol implementation of message queue , It's a way of communicating between applications , Message queue is widely used in distributed system development .RabbitMQ Official address :http://www.rabbitmq.com
Download and install
RabbitMQ from Erlang Language development , Need to install with RabbitMQ The version corresponds to Erlang Language environment , I don't want to explain specifically , Search for tutorials yourself .RabbitMQ Official website download address :http://www.rabbitmq.com/download.html
RabbitMQ How it works
The picture below is RabbitMQ Basic structure :
Component description :
- Broker: Message Queuing service process , The process consists of two parts :Exchange and Queue
- Exchange: Message queue switch , Route messages to a queue according to certain rules , Sift through the news .
- Queue: Message queue , The queue where messages are stored , Messages arrive in the queue and forward to the specified
- Producer: Message producer , That is, the client side of the manufacturer , The production client sends the message
- Consumer: Message consumer , That is, the consumer client , receive MQ Forward message .
The producer sends the message flow :
1、 Producers and Broker establish TCP Connect .
2、 Producers and Broker Establish a channel .
3、 Producers send channel messages to Broker, from Exchange Forward the message .
4、Exchange Forward the message to the specified Queue( queue )
Consumer receives message flow :
1、 Consumers and Broker establish TCP Connect
2、 Consumers and Broker Establish a channel
3、 The consumer listens to the specified Queue( queue )
4、 When a message arrives Queue when Broker By default, messages are pushed to consumers .
5、 Consumer receives message .
6、ack reply
Six message models
① Basic message model :
In the model above , There are the following concepts :
-
P: producer , That is, the program to send messages
-
C: consumer : Receiver of message , Will wait for the news to come .
-
queue: Message queue , The red part of the picture . Can cache messages ; Producers send messages to them , Consumers take messages out of it .
producer
Create a new one maven engineering , add to amqp-client rely on
-
<dependency>
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>5.7.1</version>
-
</dependency>
Connection tool class :
-
public class ConnectionUtil {
-
/**
-
* Establishment and RabbitMQ The connection of
-
* @return
-
* @throws Exception
-
*/
-
public static Connection getConnection() throws Exception {
-
// Define connection factory
-
ConnectionFactory factory = new ConnectionFactory();
-
// Set service address
-
factory.setHost( "192.168.1.103");
-
// port
-
factory.setPort( 5672);
-
// Set account information , user name 、 password 、vhost
-
factory.setVirtualHost( "/kavito");// Set up virtual machine , One mq Services can set up multiple virtual machines , Each virtual machine is equivalent to an independent mq
-
factory.setUsername( "kavito");
-
factory.setPassword( "123456");
-
// Get the connection through the factory
-
Connection connection = factory.newConnection();
-
return connection;
-
}
-
}
Producer sends message :
-
public class Send {
-
-
private final static String QUEUE_NAME = "simple_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// 1、 Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// 2、 Create channels from connections , Use channels to complete message related operations
-
Channel channel = connection.createChannel();
-
// 3、 Statement ( establish ) queue
-
// Parameters :String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
-
/**
-
* Parameter details
-
* 1、queue Queue name
-
* 2、durable Persistent or not , If you persist ,mq After the restart, the queue is still
-
* 3、exclusive Whether the connection is exclusive , The queue is only allowed access in this connection , If connection When the connection is closed, the queue will be deleted automatically , If you set this parameter true Can be used to create temporary queues
-
* 4、autoDelete Automatically delete , Whether to automatically delete the queue when it is no longer in use , If you add this parameter to exclusive Parameter set to true The temporary queue can be implemented ( The queue is automatically deleted when it is not in use )
-
* 5、arguments Parameters , You can set the extension parameters of a queue , such as : The survival time can be set
-
*/
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// 4、 The message content
-
String message = "Hello World!";
-
// Send a message to the specified queue
-
// Parameters :String exchange, String routingKey, BasicProperties props, byte[] body
-
/**
-
* Parameter details :
-
* 1、exchange, Switch , If not specified mq The default switch for ( Set to "")
-
* 2、routingKey, route key, Switch according to route key To forward the message to the specified queue , If you use the default switch ,routingKey Set to the name of the queue
-
* 3、props, The properties of the message
-
* 4、body, The message content
-
*/
-
channel.basicPublish( "", QUEUE_NAME, null, message.getBytes());
-
System.out.println( " [x] Sent '" + message + "'");
-
-
// Close channels and connections ( The best way to shut down resources is to use try-catch-finally statement )
-
channel.close();
-
connection.close();
-
}
-
}
Console :
web Manage Pages : Server address / Port number ( Local :127.0.0.1:15672, Default user and password :guest guest)
Click on the queue name , Enter details page , You can check the message :
Consumers receive messages
-
public class Recv {
-
private final static String QUEUE_NAME = "simple_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Create a session channel , Producers and mq Service all communications are in channel Done in the channel
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
// Parameters :String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
-
/**
-
* Parameter details
-
* 1、queue Queue name
-
* 2、durable Persistent or not , If you persist ,mq After the restart, the queue is still
-
* 3、exclusive Whether the connection is exclusive , The queue is only allowed access in this connection , If connection When the connection is closed, the queue will be deleted automatically , If you set this parameter true Can be used to create temporary queues
-
* 4、autoDelete Automatically delete , Whether to automatically delete the queue when it is no longer in use , If you add this parameter to exclusive Parameter set to true The temporary queue can be implemented ( The queue is automatically deleted when it is not in use )
-
* 5、arguments Parameters , You can set the extension parameters of a queue , such as : The survival time can be set
-
*/
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// Realize the consumption method
-
DefaultConsumer consumer = new DefaultConsumer(channel){
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
/**
-
* When the message is received, this method will be called
-
* @param consumerTag Consumer labels , Used to identify consumers , Set when listening to the queue channel.basicConsume
-
* @param envelope The envelope , adopt envelope
-
* @param properties Message properties
-
* @param body The message content
-
* @throws IOException
-
*/
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
// Switch
-
String exchange = envelope.getExchange();
-
// news id,mq stay channel Used to identify messages in id, Can be used to confirm that a message has been received
-
long deliveryTag = envelope.getDeliveryTag();
-
// body The message body
-
String msg = new String(body,"utf-8");
-
System.out.println( " [x] received : " + msg + "!");
-
}
-
};
-
-
// Listening to the queue , The second parameter : Whether to automatically confirm messages .
-
// Parameters :String queue, boolean autoAck, Consumer callback
-
/**
-
* Parameter details :
-
* 1、queue Queue name
-
* 2、autoAck Automatic recovery , When the consumer receives the message, he should tell mq Message received , If this parameter is set to tru It means that you will reply automatically mq, If set to false Reply by programming
-
* 3、callback, Consumption method , When the consumer receives the message, the method to be executed
-
*/
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
Console printing :
And then look at the messages in the queue , It has been consumed
We found that , The consumer has got the message , But the program didn't stop , Always listening for new messages in the queue . Once there is a new message in the queue , It will print immediately .
Message confirmation mechanism (ACK)
As can be seen from the case just now , Once the message is received by the consumer , Messages in the queue will be deleted .
So here comes the question :RabbitMQ How to know that the message has been received ?
If the consumer receives the message , I hung up before I did anything ? Or throw an exception ? Message consumption failed , however RabbitMQ There is no knowing. , So the news is lost !
therefore ,RabbitMQ There is one ACK Mechanism . When the consumer gets the message , Will send to RabbitMQ Send receipt ACK, Inform that the message has been received . But this receipt ACK There are two situations :
-
Automatically ACK: Once the message is received , Consumer automatically sends ACK
-
Manual ACK: After message reception , Don't send ACK, Manual call required
Which one do you think is better ?
It needs to see the importance of the news :
-
If the news is not important , The loss has no effect , So automatic ACK It will be more convenient
-
If the message is very important , Don't lose . So it's best to do it manually after consumption ACK, Otherwise, it will automatically ACK,RabbitMQ It will delete the message from the queue . If the consumer is down , Then the news is lost .
Our previous tests were automatic ACK Of , If you want to manually ACK, We need to change our code :
-
public class Recv2 {
-
private final static String QUEUE_NAME = "simple_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Create channels
-
final Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [x] received : " + msg + "!");
-
// Manually ACK
-
/*
-
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
* deliveryTag: Used to identify messages id
-
* multiple: Is it in batch .true: Will be a one-time ack All less than deliveryTag The news of .
-
*/
-
-
channel.basicAck(envelope.getDeliveryTag(), false);
-
}
-
};
-
// Listening to the queue , The second parameter false, Manually ACK
-
channel.basicConsume(QUEUE_NAME, false, consumer);
-
}
-
}
The last line of code sets the second parameter to false
channel.basicConsume(QUEUE_NAME, false, consumer);
Automatically ACK The problem is
Modify the consumer , Add exception , as follows :
The producer doesn't make any changes , Direct operation , Message sent successfully :
Run consumer , Exception thrown by program :
Management interface :
The consumer throws an exception , But the news is still being consumed , Actually, we haven't got the news yet .
Demonstrate manual ACK
Rerun the producer to send the message :
Again , It's done manually ack Throw an exception before , function Recv2
Let's look at the management interface :
The message has not been consumed !
There's another situation : Modify the consumer Recv2, Automatically change the second parameter of the listening queue to manual ,( Remove the anomalies that were made before ) , And there's no manual way to consume ACK
Producer code invariant , Run again :
Run consumer :
however , View the management interface , Find out :
Stop the consumer program , Find out :
This is because although we set up manual ACK, But there is no message confirmation in the code ! So the news is not really consumed . When we turn off this consumer , The state of the message changes to Ready.
The right thing to do is :
We want to set the second parameter to... When listening to the queue false, Manually in the code ACK
Run the consumer again , see web Manage Pages :
Consumer success !
Producers avoid data loss :https://www.cnblogs.com/vipstone/p/9350075.html
②work Message model
Work queue or competitive consumer model
work queues Compared with the introductory program , One more consumer side , Two consumers consume messages in the same queue together , But a message can only be obtained by one consumer .
The message model is in Web Especially useful in applications , Can handle short HTTP Complex tasks cannot be processed in the request window .
Next let's simulate the process :
P: producer : Publisher of task
C1: consumer 1: Pick up the task and finish it , Let's say it's done slowly ( Simulation takes time )
C2: consumer 2: Pick up the task and finish it , Let's say it's faster
producer
The producer circulates 50 Bar message
-
public class Send {
-
private final static String QUEUE_NAME = "test_work_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// Cycle through tasks
-
for (int i = 0; i < 50; i++) {
-
// The message content
-
String message = "task .. " + i;
-
channel.basicPublish( "", QUEUE_NAME, null, message.getBytes());
-
System.out.println( " [x] Sent '" + message + "'");
-
-
Thread.sleep(i * 2);
-
}
-
// Close channels and connections
-
channel.close();
-
connection.close();
-
}
-
}
consumer 1
-
public class Recv {
-
private final static String QUEUE_NAME = "test_work_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Create a session channel , Producers and mq Service all communications are in channel Done in the channel
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// Realize the consumption method
-
DefaultConsumer consumer = new DefaultConsumer(channel){
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body,"utf-8");
-
System.out.println( " [ consumer 1] received : " + msg + "!");
-
// The simulation task takes time 1s
-
try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
-
}
-
};
-
// Listening to the queue , The second parameter : Whether to automatically confirm messages .
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
consumer 2
The code is not pasted , With consumers 1 similar , It's just the consumer 2 No consumption time is set .
Next , Two consumers launch together , And then send 50 Bar message :
You can find , The two consumers consume different things 25 Bar message , This enables the distribution of tasks .
an able man is always busy
Is there a problem with the implementation just now ?
-
consumer 1 More than consumers 2 It's less efficient , A task takes a long time
-
However, the number of messages that the two end up consuming is the same
-
consumer 2 A lot of time is idle , consumer 1 Keep busy
The current state is to distribute tasks equally , The right thing to do is to spend more quickly , The more you spend .
How to achieve it ?
adopt BasicQos Method setting prefetchCount = 1. such RabbitMQ It will make every Consumer At the same time, the most processing 1 individual Message. let me put it another way , On receiving the Consumer Of ack front , He's not going to put new Message Distribute it . contrary , It will assign it to the next one who is not still busy Consumer.
It is worth noting that :prefetchCount It's manual ack Only in the case of , Automatically ack Don't take effect .
The test again :
Subscription model classification
Under the instructions :
1、 One producer, many consumers
2、 Each consumer has its own queue
3、 The producer did not send the message directly to the queue , But to exchange( Switch 、 Transponder )
4、 Each queue needs to be bound to the switch
5、 Message sent by the producer , Through the switch to the queue , Realize that a message is consumed by multiple consumers
Example : register -> email 、 texting
X(Exchanges): Switch on the one hand : Receive messages from producers . On the other hand : Know how to handle messages , For example, submit to a special queue 、 Submit to all queues 、 Or discard the message . How to operate , Depending on Exchange The type of .
Exchange There are several types of :
Fanout: radio broadcast , Deliver the message to all queues bound to the switch
Direct: directional , Give the message to the match routing key Queues
Topic: wildcard , Give the message to the conformity routing pattern( Routing mode ) Queues
Header:header Patterns and routing The difference is this ,header Mode cancel routingkey, Use header Medium key/value( Key value pair ) Match the queue .
Header The pattern doesn't unfold , If you are interested, please refer to this article https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange( Switch ) Only responsible for forwarding messages , No ability to store messages , So if there are no queues with Exchange binding , Or there is no queue that meets the routing rules , Then the news will be lost !
③Publish/subscribe( Switch type :Fanout, Also known as broadcast )
Publish/subscribe Model diagram :
producer
Different from the previous two models :
-
1) Statement Exchange, No more claims Queue
-
2) Send a message to Exchange, No longer send to Queue
-
public class Send {
-
-
private final static String EXCHANGE_NAME = "test_fanout_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Statement exchange, The specified type is fanout
-
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
-
-
// The message content
-
String message = " Registered successfully !!";
-
// Post a message to Exchange
-
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
-
System.out.println( " [ producer ] Sent '" + message + "'");
-
-
channel.close();
-
connection.close();
-
}
-
}
consumer 1 ( Successfully registered and sent to SMS service )
-
public class Recv {
-
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";// Message queue
-
-
private final static String EXCHANGE_NAME = "test_fanout_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-
// Bind queue to switch
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
-
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
-
byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [ SMS service ] received : " + msg + "!");
-
}
-
};
-
// Listening to the queue , Auto return complete
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
consumer 2( Successfully registered and sent to email service )
-
public class Recv2 {
-
private final static String QUEUE_NAME = "fanout_exchange_queue_email";// Mail queue
-
-
private final static String EXCHANGE_NAME = "test_fanout_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-
// Bind queue to switch
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
-
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
-
byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [ The mail service ] received : " + msg + "!");
-
}
-
};
-
// Listening to the queue , Auto return complete
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
We run two consumers , And then send 1 Bar message :
reflection :
1、publish/subscribe And work queues What's the difference? .
difference :
1)work queues There's no need to define switches , and publish/subscribe Need to define switches .
2)publish/subscribe The production side of is to send messages to the switch ,work queues The producer of is queue oriented sending messages ( The bottom layer uses the default switch ).
3)publish/subscribe You need to set the binding between the queue and the switch ,work queues You don't have to set it , actually work queues The queue will be bound to the default switch .
The same thing :
So the release of both / The effect of subscription is the same , Multiple consumers listen to the same queue and will not consume messages repeatedly .
2、 For practical work publish/subscribe still work queues.
It is recommended to use publish/subscribe, Publish subscribe mode is more powerful than work queue mode ( You can also compete in the same queue ), And publish subscribe mode can specify its own dedicated switch .
④Routing Routing model ( Switch type :direct)
Routing Model diagram :
P: producer , towards Exchange Send a message , When sending a message , Will specify a routing key.
X:Exchange( Switch ), Receive messages from producers , Then send the message to And routing key A perfectly matched queue
C1: consumer , Its queue specifies the need for routing key by error The news of
C2: consumer , Its queue specifies the need for routing key by info、error、warning The news of
Now let's look at the code :
producer
-
public class Send {
-
private final static String EXCHANGE_NAME = "test_direct_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Statement exchange, The specified type is direct
-
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-
// The message content ,
-
String message = " Registered successfully ! Please reply by SMS [T] unsubscribe ";
-
// Send a message , And specify routing key by :sms, Only SMS can receive messages
-
channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
-
System.out.println( " [x] Sent '" + message + "'");
-
-
channel.close();
-
connection.close();
-
}
-
}
consumer 1
-
public class Recv {
-
private final static String QUEUE_NAME = "direct_exchange_queue_sms";// Message queue
-
private final static String EXCHANGE_NAME = "test_direct_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-
// Bind queue to switch , Also specify the routing key. You can specify multiple
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");// Specify the receiving sender specifies routing key by sms The news of
-
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
-
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
-
byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [ SMS service ] received : " + msg + "!");
-
}
-
};
-
// Listening to the queue , Automatically ACK
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
consumer 2
-
public class Recv2 {
-
private final static String QUEUE_NAME = "direct_exchange_queue_email";// Mail queue
-
private final static String EXCHANGE_NAME = "test_direct_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-
// Bind queue to switch , Also specify the routing key. You can specify multiple
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");// Specify the receiving sender specifies routing key by email The news of
-
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
-
byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [ The mail service ] received : " + msg + "!");
-
}
-
};
-
// Listening to the queue , Automatically ACK
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
We send sms Of RoutingKey, Find out : Only designated SMS consumers 1 I got the message
⑤Topics Wildcard pattern ( Switch type :topics)
Topics Model diagram :
Each consumer listens to its own queue , And set the routingkey, The producer sent the message to broker, By switch according to routingkey To forward messages to the specified queue .
Routingkey It is usually composed of one or more words , Between many words “.” Division , for example :inform.sms
Wildcard rules :
#
: Match one or more words
*
: It's just that there are not many matches 1 Word
give an example :
audit.#
: Able to match audit.irs.corporate
perhaps audit.irs
audit.*
: Only match audit.irs
It can be seen from the schematic diagram , We will send all the messages describing animals . The message will be used by three words ( Two points ) Composed of Routing key send out . The first word in the routing keyword will describe the speed , The second color and the third category :“<speed>.<color>.<species>”.
We created three bindings :Q1 The binding “*.orange.*”,Q2 The binding “.*.*.rabbit” and “lazy.#”.
Q1 Match all the orange animals .
Q2 Match the news about rabbits and lazy animals .
Let's do a little exercise , If the producer sends the following message , Which queue will you enter :
quick.orange.rabbit Q1 Q2 routingKey="quick.orange.rabbit" The message will be routed to at the same time Q1 And Q2
lazy.orange.elephant Q1 Q2
quick.orange.fox Q1
lazy.pink.rabbit Q2 ( It is worth noting that , Although the routingKey And Q2 Of the two bindingKey All match , But it only delivers Q2 once )
quick.brown.fox Does not match any queue , To be discarded
quick.orange.male.rabbit Does not match any queue , To be discarded
orange Does not match any queue , To be discarded
Let's specify Routing key="quick.orange.rabbit" For example , Verify the answer above
producer
-
public class Send {
-
private final static String EXCHANGE_NAME = "test_topic_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Statement exchange, The specified type is topic
-
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
-
// The message content
-
String message = " This is a fast moving orange rabbit ";
-
// Send a message , And specify routing key by :quick.orange.rabbit
-
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
-
System.out.println( " [ Animal description :] Sent '" + message + "'");
-
-
channel.close();
-
connection.close();
-
}
-
}
consumer 1
-
public class Recv {
-
private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
-
private final static String EXCHANGE_NAME = "test_topic_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-
// Bind queue to switch , Also specify the routing key. Subscribe to all the orange animals
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
-
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
-
byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [ consumer 1] received : " + msg + "!");
-
}
-
};
-
// Listening to the queue , Automatically ACK
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
consumer 2
-
public class Recv2 {
-
private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
-
private final static String EXCHANGE_NAME = "test_topic_exchange";
-
-
public static void main(String[] argv) throws Exception {
-
// Get to connection
-
Connection connection = ConnectionUtil.getConnection();
-
// Get access to
-
Channel channel = connection.createChannel();
-
// Declaration queue
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-
// Bind queue to switch , Also specify the routing key. Subscribe to news about rabbits and lazy animals
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
-
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
-
-
// Define the consumers of the queue
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
-
byte[] body) throws IOException {
-
// body The message body
-
String msg = new String(body);
-
System.out.println( " [ consumer 2] received : " + msg + "!");
-
}
-
};
-
// Listening to the queue , Automatically ACK
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
result C1、C2 Yes, we've all received the message :
⑥RPC
RPC Model diagram :
Basic concepts :
Callback queue The callback queue , Client sends request to server , After the server processes the request , Save its processing results in a storage . And the client in order to get the result , When the client sends a request to the server , Send a callback queue address at the same time reply_to.
Correlation id Association marks , The client may send multiple requests to the server , When the server finishes processing , The client cannot distinguish the response in the callback queue from the response in the request . To deal with this situation , When the client sends each request , At the same time, a unique correlation_id attribute , In this way, the client in the callback queue according to correlation_id The value of the field can tell which request this response belongs to .
Process description :
- When the client starts , It creates an anonymous and exclusive callback queue .
- stay RPC In request , The client sends a message with two properties : One is to set the callback queue reply_to attribute , The other is to set a unique value correlation_id attribute .
- Send the request to a rpc_queue In line .
- The server waits for requests to be sent to this queue . When the request comes up , It performs his work and sends a message with the result of the execution to reply_to The queue specified by the field .
- The client waits for the data in the callback queue . When news comes up , It will check. correlation_id attribute . If the value of this property matches the request , Return it to the application
Share two interview questions :
Interview questions :
Avoid the accumulation of news ?
1) use workqueue, Multiple consumers listen to the same queue .
2) After receiving the message , It's through the thread pool , Asynchronous consumption .
How to avoid message loss ?
1) Consumers' ACK Mechanism . Can prevent consumers from losing messages .
however , If before consumers consume ,MQ It's down. , The news is gone ?
2) Messages can be persisted . To persist the message , Premise is : queue 、Exchange Are persistent
Switch persistence
Queue persistence
Message persistence
Spring Integrate RibbitMQ
The following is the simulation of the registration service when the user successfully registered , The scenario of pushing messages to SMS and email services
build SpringBoot Environmental Science
Create two projects mq-rabbitmq-producer and mq-rabbitmq-consumer, Configure separately 1、2、3( Step 3 in this example, the consumer uses the form of annotation , You don't have to go with )
1、 add to AMQP The starter :
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring‐boot‐starter‐test</artifactId>
-
</dependency>
2、 stay application.yml
Add RabbitMQ Configuration of :
-
server:
-
port: 10086
-
spring:
-
application:
-
name: mq-rabbitmq-producer
-
rabbitmq:
-
host: 192.168.1.103
-
port: 5672
-
username: kavito
-
password: 123456
-
virtualHost: /kavito
-
template:
-
retry:
-
enabled: true
-
initial-interval: 10000ms
-
max-interval: 300000ms
-
multiplier: 2
-
exchange: topic.exchange
-
publisher-confirms: true
Attribute specification :
-
template: of
AmqpTemplate
Configuration of-
retry: Failure to retry
-
enabled: Open failed to try again
-
initial-interval: The interval between the first retries
-
max-interval: Maximum retry interval , You will not try again after this interval
-
multiplier: Multiple of next retry interval , Here is 2 That is to say, the next retry interval is the last time 2 times
-
-
exchange: Default switch name , After configuration here , Send a message if you don't specify a switch
-
-
publisher-confirms: Producer recognition mechanism , Make sure the message is sent correctly , If the sending fails, there will be an error receipt , This triggers a retrial
Of course, if consumer Just receive messages, not send them , You don't have to configure template Related content .
3、 Definition RabbitConfig Configuration class , To configure Exchange、Queue、 And binding switches .
-
-
public class RabbitmqConfig {
-
public static final String QUEUE_EMAIL = "queue_email";//email queue
-
public static final String QUEUE_SMS = "queue_sms";//sms queue
-
public static final String EXCHANGE_NAME="topic.exchange";//topics Type switch
-
public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
-
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
-
-
// Declaration switch
-
-
public Exchange exchange(){
-
//durable(true) Persistence ,mq After the restart, the switch is still
-
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
-
}
-
-
// Statement email queue
-
/*
-
* new Queue(QUEUE_EMAIL,true,false,false)
-
* durable="true" Persistence rabbitmq There is no need to create a new queue on restart
-
* auto-delete Indicates that the message queue will be automatically deleted when it is not in use The default is false
-
* exclusive Indicates whether the message queue is only in the current connection take effect , The default is false
-
*/
-
-
public Queue emailQueue(){
-
return new Queue(QUEUE_EMAIL);
-
}
-
// Statement sms queue
-
-
public Queue smsQueue(){
-
return new Queue(QUEUE_SMS);
-
}
-
-
//ROUTINGKEY_EMAIL Queue bound switches , Appoint routingKey
-
-
public Binding bindingEmail(
-
-
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
-
}
-
//ROUTINGKEY_SMS Queue bound switches , Appoint routingKey
-
-
public Binding bindingSMS(
-
-
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
-
}
-
-
}
producer (mq-rabbitmq-producer)
For testing purposes , I put the producer code directly into the engineering test class : send out routing key yes "topic.sms.email" The news of , that mq-rabbitmq-consumer Take those monitors ( And the switch (topic.exchange) binding , And subscribe to routingkey It matches "topic.sms.email" Regular ) The queue will receive a message .
-
-
-
public class Send {
-
-
-
RabbitTemplate rabbitTemplate;
-
-
-
public void sendMsgByTopics(){
-
-
/**
-
* Parameters :
-
* 1、 Switch name
-
* 2、routingKey
-
* 3、 The message content
-
*/
-
for (int i=0;i<5;i++){
-
String message = " Congratulations , Registered successfully !userid="+i;
-
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "topic.sms.email",message);
-
System.out.println( " [x] Sent '" + message + "'");
-
}
-
-
}
-
}
Run the test class to send 5 Bar message :
web Management interface : You can see that the switch has been created and queue_email、queue_sms 2 A queue , And sent... To the two queues respectively 5 Bar message
consumer (mq-rabbitmq-consumer)
Write a listener component , Configuring consumer queues through annotations , And the binding relationship between the queue and the switch .( It can also be configured like a producer through configuration classes )
stay SpringAmqp in , Encapsulate and abstract the consumers of the message . One JavaBean Methods , Just add @RabbitListener annotation , You can be a consumer .
-
-
public class ReceiveHandler {
-
-
// Listen to the mail queue
-
-
-
-
-
-
-
-
-
public void rece_email(String msg){
-
System.out.println( " [ The mail service ] received : " + msg + "!");
-
}
-
-
// Listen to the SMS queue
-
-
-
-
-
-
-
-
-
public void rece_sms(String msg){
-
System.out.println( " [ SMS service ] received : " + msg + "!");
-
}
-
}
Attribute specification :
-
@Componet
: Annotations on the class , Sign up to Spring Containers -
@RabbitListener
: Comments on methods , Declare that this method is a consumer method , You need to specify the following properties :-
bindings
: Specify the binding relationship , There can be multiple . The value is@QueueBinding
Array of .@QueueBinding
Contains the following properties :-
value
: This consumer related queue . The value is@Queue
, For a queue -
exchange
: The switch to which the queue is bound , The value is@Exchange
type -
key
: The queue is bound to the switchRoutingKey, More than one... Can be specified
-
-
start-up mq-rabbitmq-comsumer project
ok, After the mail service and SMS service receive the message , You can start your own business .