当前位置:网站首页>How to get started with rabbitmq

How to get started with rabbitmq

2020-11-08 23:46:00 Fengshuwan River Bridge

Link to the original text :https://blog.csdn.net/kavito/article/details/91403659

Introducing RabbitMQ Before , Let's first take a look at the scene of an e-commerce project :

  • The original data of the goods are stored in the database , Addition, deletion, modification and query are all completed in the database .

  • The data source of search service is index library (Elasticsearch), If the database item changes , Index database data can't be updated in time .

  • The product details have been processed statically , Static page data will not change with the update of database products .

If we change the price of the goods in the background , The search page and the product details page still show the old price , This is obviously not right . How to solve ?  

We might think of doing this :

  • programme 1: Whenever the background of the goods to do the operation of adding, deleting and changing , At the same time, modify the index database data and update the static page .

  • programme 2: Search service and product page static service provide external operation interface , Backstage after the addition, deletion and modification of goods , Call interface . 

  Both options have a serious problem : It's code coupling , Search and product page services need to be embedded in background services , Against microservices Independent principle .

At this time , We'll take another solution , That's message queuing ! 

        After the addition, deletion and modification of goods and services , There's no need to manipulate index libraries and static pages , Just to MQ Send a message ( For example, it includes goods id The news of ), I don't care who receives the message . Search services and static page services listen MQ, receive messages , Then go to deal with index library and static page respectively ( According to the goods id To update the index library and product details static page ). 

What is message queuing

MQ Its full name is Message Queue, That's message queuing .“ Message queue ” It's a container that holds messages during their transmission . It's typical of : producer 、 Consumer model . The producer keeps producing messages... To the message queue , Consumers are constantly getting messages from the queue . Because the production and consumption of messages are asynchronous , And only care about the sending and receiving of messages , No intrusion of business logic , In this way, the decoupling between producers and consumers is realized .

 

Message queuing in development usually has the following application scenarios :

1、 Tasks are processed asynchronously :

High concurrency environment , Due to late synchronization processing , Requests tend to jam , for instance , a large number of insert,update And so on at the same time MySQL, Leads directly to an infinite number of row lock table locks , You may even end up with too many requests , triggering too many connections error . By using message queues , We can process requests asynchronously , To relieve pressure on the system . The message queue notifies the message receiver of asynchronous processing of the long-time operation that does not need synchronous processing . Reduced application response time .

2、 Application decoupling :

MQ It's like an intermediary , The manufacturer passes through MQ Interact with consumers , It decouples applications .

 

AMQP and JMS

MQ It's a model of message communication , Concurrent implementation . Now realize MQ There are two main ways :AMQP、JMS.

The difference and connection between the two :

  • JMS It defines a unified interface , To unify message operations ;AMQP It's about Unifying the format of data interaction through protocols

  • JMS It limits the use of Java Language ;AMQP It's just an agreement , There is no way of implementation , So it's Cross lingual .

  • JMS Two message models are specified ; and AMQP The message model is much richer

common MQ product

  • ActiveMQ: be based on JMS

  • RabbitMQ: be based on AMQP agreement ,erlang Language development , Good stability

  • RocketMQ: be based on JMS, Alibaba products , It's up to Apache The foundation

  • Kafka: Distributed messaging system , High throughput

 

RabbitMQ Quick start

RabbitMQ By erlang Language development , be based on AMQP(Advanced Message Queue Advanced message queue protocol ) Protocol implementation of message queue , It's a way of communicating between applications , Message queue is widely used in distributed system development .RabbitMQ Official address :http://www.rabbitmq.com

Download and install

RabbitMQ from Erlang Language development , Need to install with RabbitMQ The version corresponds to Erlang Language environment , I don't want to explain specifically , Search for tutorials yourself .RabbitMQ Official website download address :http://www.rabbitmq.com/download.html

RabbitMQ How it works

The picture below is RabbitMQ Basic structure :

Component description :

  • Broker: Message Queuing service process , The process consists of two parts :Exchange and Queue
  • Exchange: Message queue switch , Route messages to a queue according to certain rules , Sift through the news .
  • Queue: Message queue , The queue where messages are stored , Messages arrive in the queue and forward to the specified
  • Producer: Message producer , That is, the client side of the manufacturer , The production client sends the message
  • Consumer: Message consumer , That is, the consumer client , receive MQ Forward message .

The producer sends the message flow :

1、 Producers and Broker establish TCP Connect .

2、 Producers and Broker Establish a channel .

3、 Producers send channel messages to Broker, from Exchange Forward the message .

4、Exchange Forward the message to the specified Queue( queue )

Consumer receives message flow :

1、 Consumers and Broker establish TCP Connect

2、 Consumers and Broker Establish a channel

3、 The consumer listens to the specified Queue( queue )

4、 When a message arrives Queue when Broker By default, messages are pushed to consumers .

5、 Consumer receives message .

6、ack reply

Six message models

① Basic message model :

In the model above , There are the following concepts :

  • P: producer , That is, the program to send messages

  • C: consumer : Receiver of message , Will wait for the news to come .

  • queue: Message queue , The red part of the picture . Can cache messages ; Producers send messages to them , Consumers take messages out of it . 

  producer

Create a new one maven engineering , add to amqp-client rely on

  1.  
    <dependency>
  2.  
    <groupId>com.rabbitmq</groupId>
  3.  
    <artifactId>amqp-client</artifactId>
  4.  
    <version>5.7.1</version>
  5.  
    </dependency>

  Connection tool class :

  1.  
    public class ConnectionUtil {
  2.  
    /**
  3.  
    * Establishment and RabbitMQ The connection of
  4.  
    * @return
  5.  
    * @throws Exception
  6.  
    */
  7.  
    public static Connection getConnection() throws Exception {
  8.  
    // Define connection factory
  9.  
    ConnectionFactory factory = new ConnectionFactory();
  10.  
    // Set service address
  11.  
    factory.setHost( "192.168.1.103");
  12.  
    // port
  13.  
    factory.setPort( 5672);
  14.  
    // Set account information , user name 、 password 、vhost
  15.  
    factory.setVirtualHost( "/kavito");// Set up virtual machine , One mq Services can set up multiple virtual machines , Each virtual machine is equivalent to an independent mq
  16.  
    factory.setUsername( "kavito");
  17.  
    factory.setPassword( "123456");
  18.  
    // Get the connection through the factory
  19.  
    Connection connection = factory.newConnection();
  20.  
    return connection;
  21.  
    }
  22.  
    }

Producer sends message :

  1.  
    public class Send {
  2.  
     
  3.  
    private final static String QUEUE_NAME = "simple_queue";
  4.  
     
  5.  
    public static void main(String[] argv) throws Exception {
  6.  
    // 1、 Get to connection
  7.  
    Connection connection = ConnectionUtil.getConnection();
  8.  
    // 2、 Create channels from connections , Use channels to complete message related operations
  9.  
    Channel channel = connection.createChannel();
  10.  
    // 3、 Statement ( establish ) queue
  11.  
    // Parameters :String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  12.  
    /**
  13.  
    * Parameter details
  14.  
    * 1、queue Queue name
  15.  
    * 2、durable Persistent or not , If you persist ,mq After the restart, the queue is still
  16.  
    * 3、exclusive Whether the connection is exclusive , The queue is only allowed access in this connection , If connection When the connection is closed, the queue will be deleted automatically , If you set this parameter true Can be used to create temporary queues
  17.  
    * 4、autoDelete Automatically delete , Whether to automatically delete the queue when it is no longer in use , If you add this parameter to exclusive Parameter set to true The temporary queue can be implemented ( The queue is automatically deleted when it is not in use )
  18.  
    * 5、arguments Parameters , You can set the extension parameters of a queue , such as : The survival time can be set
  19.  
    */
  20.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  21.  
    // 4、 The message content
  22.  
    String message = "Hello World!";
  23.  
    // Send a message to the specified queue
  24.  
    // Parameters :String exchange, String routingKey, BasicProperties props, byte[] body
  25.  
    /**
  26.  
    * Parameter details :
  27.  
    * 1、exchange, Switch , If not specified mq The default switch for ( Set to "")
  28.  
    * 2、routingKey, route key, Switch according to route key To forward the message to the specified queue , If you use the default switch ,routingKey Set to the name of the queue
  29.  
    * 3、props, The properties of the message
  30.  
    * 4、body, The message content
  31.  
    */
  32.  
    channel.basicPublish( "", QUEUE_NAME, null, message.getBytes());
  33.  
    System.out.println( " [x] Sent '" + message + "'");
  34.  
     
  35.  
    // Close channels and connections ( The best way to shut down resources is to use try-catch-finally statement )
  36.  
    channel.close();
  37.  
    connection.close();
  38.  
    }
  39.  
    }

  Console :

web Manage Pages : Server address / Port number ( Local :127.0.0.1:15672, Default user and password :guest guest)

Click on the queue name , Enter details page , You can check the message :

Consumers receive messages

  1.  
    public class Recv {
  2.  
    private final static String QUEUE_NAME = "simple_queue";
  3.  
     
  4.  
    public static void main(String[] argv) throws Exception {
  5.  
    // Get to connection
  6.  
    Connection connection = ConnectionUtil.getConnection();
  7.  
    // Create a session channel , Producers and mq Service all communications are in channel Done in the channel
  8.  
    Channel channel = connection.createChannel();
  9.  
    // Declaration queue
  10.  
    // Parameters :String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  11.  
    /**
  12.  
    * Parameter details
  13.  
    * 1、queue Queue name
  14.  
    * 2、durable Persistent or not , If you persist ,mq After the restart, the queue is still
  15.  
    * 3、exclusive Whether the connection is exclusive , The queue is only allowed access in this connection , If connection When the connection is closed, the queue will be deleted automatically , If you set this parameter true Can be used to create temporary queues
  16.  
    * 4、autoDelete Automatically delete , Whether to automatically delete the queue when it is no longer in use , If you add this parameter to exclusive Parameter set to true The temporary queue can be implemented ( The queue is automatically deleted when it is not in use )
  17.  
    * 5、arguments Parameters , You can set the extension parameters of a queue , such as : The survival time can be set
  18.  
    */
  19.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20.  
    // Realize the consumption method
  21.  
    DefaultConsumer consumer = new DefaultConsumer(channel){
  22.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  23.  
    /**
  24.  
    * When the message is received, this method will be called
  25.  
    * @param consumerTag Consumer labels , Used to identify consumers , Set when listening to the queue channel.basicConsume
  26.  
    * @param envelope The envelope , adopt envelope
  27.  
    * @param properties Message properties
  28.  
    * @param body The message content
  29.  
    * @throws IOException
  30.  
    */
  31.  
    @Override
  32.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  33.  
    // Switch
  34.  
    String exchange = envelope.getExchange();
  35.  
    // news id,mq stay channel Used to identify messages in id, Can be used to confirm that a message has been received
  36.  
    long deliveryTag = envelope.getDeliveryTag();
  37.  
    // body The message body
  38.  
    String msg = new String(body,"utf-8");
  39.  
    System.out.println( " [x] received : " + msg + "!");
  40.  
    }
  41.  
    };
  42.  
     
  43.  
    // Listening to the queue , The second parameter : Whether to automatically confirm messages .
  44.  
    // Parameters :String queue, boolean autoAck, Consumer callback
  45.  
    /**
  46.  
    * Parameter details :
  47.  
    * 1、queue Queue name
  48.  
    * 2、autoAck Automatic recovery , When the consumer receives the message, he should tell mq Message received , If this parameter is set to tru It means that you will reply automatically mq, If set to false Reply by programming
  49.  
    * 3、callback, Consumption method , When the consumer receives the message, the method to be executed
  50.  
    */
  51.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  52.  
    }
  53.  
    }

Console printing :

And then look at the messages in the queue , It has been consumed

 

  We found that , The consumer has got the message , But the program didn't stop , Always listening for new messages in the queue . Once there is a new message in the queue , It will print immediately .

Message confirmation mechanism (ACK)

As can be seen from the case just now , Once the message is received by the consumer , Messages in the queue will be deleted .

So here comes the question :RabbitMQ How to know that the message has been received ?

If the consumer receives the message , I hung up before I did anything ? Or throw an exception ? Message consumption failed , however RabbitMQ There is no knowing. , So the news is lost !

therefore ,RabbitMQ There is one ACK Mechanism . When the consumer gets the message , Will send to RabbitMQ Send receipt ACK, Inform that the message has been received . But this receipt ACK There are two situations :

  • Automatically ACK: Once the message is received , Consumer automatically sends ACK

  • Manual ACK: After message reception , Don't send ACK, Manual call required

Which one do you think is better ?

It needs to see the importance of the news :

  • If the news is not important , The loss has no effect , So automatic ACK It will be more convenient

  • If the message is very important , Don't lose . So it's best to do it manually after consumption ACK, Otherwise, it will automatically ACK,RabbitMQ It will delete the message from the queue . If the consumer is down , Then the news is lost .

Our previous tests were automatic ACK Of , If you want to manually ACK, We need to change our code :

  1.  
    public class Recv2 {
  2.  
    private final static String QUEUE_NAME = "simple_queue";
  3.  
     
  4.  
    public static void main(String[] argv) throws Exception {
  5.  
    // Get to connection
  6.  
    Connection connection = ConnectionUtil.getConnection();
  7.  
    // Create channels
  8.  
    final Channel channel = connection.createChannel();
  9.  
    // Declaration queue
  10.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.  
    // Define the consumers of the queue
  12.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  13.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  14.  
    @Override
  15.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16.  
    // body The message body
  17.  
    String msg = new String(body);
  18.  
    System.out.println( " [x] received : " + msg + "!");
  19.  
    // Manually ACK
  20.  
    /*
  21.  
    * void basicAck(long deliveryTag, boolean multiple) throws IOException;
  22.  
    * deliveryTag: Used to identify messages id
  23.  
    * multiple: Is it in batch .true: Will be a one-time ack All less than deliveryTag The news of .
  24.  
    */
  25.  
     
  26.  
    channel.basicAck(envelope.getDeliveryTag(), false);
  27.  
    }
  28.  
    };
  29.  
    // Listening to the queue , The second parameter false, Manually ACK
  30.  
    channel.basicConsume(QUEUE_NAME, false, consumer);
  31.  
    }
  32.  
    }

  The last line of code sets the second parameter to false

channel.basicConsume(QUEUE_NAME, false, consumer);

Automatically ACK The problem is

Modify the consumer , Add exception , as follows :

The producer doesn't make any changes , Direct operation , Message sent successfully :  

  Run consumer , Exception thrown by program :

Management interface :

  The consumer throws an exception , But the news is still being consumed , Actually, we haven't got the news yet .

Demonstrate manual ACK

Rerun the producer to send the message :

Again , It's done manually ack Throw an exception before , function Recv2

  Let's look at the management interface :

  The message has not been consumed !

There's another situation : Modify the consumer Recv2, Automatically change the second parameter of the listening queue to manual ,( Remove the anomalies that were made before ) , And there's no manual way to consume ACK

 

  Producer code invariant , Run again :

Run consumer :

however , View the management interface , Find out :

Stop the consumer program , Find out :  

This is because although we set up manual ACK, But there is no message confirmation in the code ! So the news is not really consumed . When we turn off this consumer , The state of the message changes to Ready.

The right thing to do is :

We want to set the second parameter to... When listening to the queue false, Manually in the code ACK

 

  Run the consumer again , see web Manage Pages :

Consumer success !  

Producers avoid data loss :https://www.cnblogs.com/vipstone/p/9350075.html

 

②work Message model

Work queue or competitive consumer model

work queues Compared with the introductory program , One more consumer side , Two consumers consume messages in the same queue together , But a message can only be obtained by one consumer .

The message model is in Web Especially useful in applications , Can handle short HTTP Complex tasks cannot be processed in the request window .

Next let's simulate the process :

P: producer : Publisher of task

C1: consumer 1: Pick up the task and finish it , Let's say it's done slowly ( Simulation takes time )

C2: consumer 2: Pick up the task and finish it , Let's say it's faster

 

producer

The producer circulates 50 Bar message

  1.  
    public class Send {
  2.  
    private final static String QUEUE_NAME = "test_work_queue";
  3.  
     
  4.  
    public static void main(String[] argv) throws Exception {
  5.  
    // Get to connection
  6.  
    Connection connection = ConnectionUtil.getConnection();
  7.  
    // Get access to
  8.  
    Channel channel = connection.createChannel();
  9.  
    // Declaration queue
  10.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.  
    // Cycle through tasks
  12.  
    for (int i = 0; i < 50; i++) {
  13.  
    // The message content
  14.  
    String message = "task .. " + i;
  15.  
    channel.basicPublish( "", QUEUE_NAME, null, message.getBytes());
  16.  
    System.out.println( " [x] Sent '" + message + "'");
  17.  
     
  18.  
    Thread.sleep(i * 2);
  19.  
    }
  20.  
    // Close channels and connections
  21.  
    channel.close();
  22.  
    connection.close();
  23.  
    }
  24.  
    }

 

consumer 1

  1.  
    public class Recv {
  2.  
    private final static String QUEUE_NAME = "test_work_queue";
  3.  
     
  4.  
    public static void main(String[] argv) throws Exception {
  5.  
    // Get to connection
  6.  
    Connection connection = ConnectionUtil.getConnection();
  7.  
    // Create a session channel , Producers and mq Service all communications are in channel Done in the channel
  8.  
    Channel channel = connection.createChannel();
  9.  
    // Declaration queue
  10.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.  
    // Realize the consumption method
  12.  
    DefaultConsumer consumer = new DefaultConsumer(channel){
  13.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  14.  
    @Override
  15.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16.  
    // body The message body
  17.  
    String msg = new String(body,"utf-8");
  18.  
    System.out.println( " [ consumer 1] received : " + msg + "!");
  19.  
    // The simulation task takes time 1s
  20.  
    try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
  21.  
    }
  22.  
    };
  23.  
    // Listening to the queue , The second parameter : Whether to automatically confirm messages .
  24.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  25.  
    }
  26.  
    }

consumer 2

The code is not pasted , With consumers 1 similar , It's just the consumer 2 No consumption time is set .

Next , Two consumers launch together , And then send 50 Bar message :

 

You can find , The two consumers consume different things 25 Bar message , This enables the distribution of tasks .   

an able man is always busy

Is there a problem with the implementation just now ?

  • consumer 1 More than consumers 2 It's less efficient , A task takes a long time

  • However, the number of messages that the two end up consuming is the same

  • consumer 2 A lot of time is idle , consumer 1 Keep busy

The current state is to distribute tasks equally , The right thing to do is to spend more quickly , The more you spend .

How to achieve it ?

adopt BasicQos Method setting prefetchCount = 1. such RabbitMQ It will make every Consumer At the same time, the most processing 1 individual Message. let me put it another way , On receiving the Consumer Of ack front , He's not going to put new Message Distribute it . contrary , It will assign it to the next one who is not still busy Consumer.

It is worth noting that :prefetchCount It's manual ack Only in the case of , Automatically ack Don't take effect .

The test again : 

 

Subscription model classification

Under the instructions :

1、 One producer, many consumers
2、 Each consumer has its own queue
3、 The producer did not send the message directly to the queue , But to exchange( Switch 、 Transponder )
4、 Each queue needs to be bound to the switch
5、 Message sent by the producer , Through the switch to the queue , Realize that a message is consumed by multiple consumers
Example : register -> email 、 texting

X(Exchanges): Switch on the one hand : Receive messages from producers . On the other hand : Know how to handle messages , For example, submit to a special queue 、 Submit to all queues 、 Or discard the message . How to operate , Depending on Exchange The type of .

Exchange There are several types of :

Fanout: radio broadcast , Deliver the message to all queues bound to the switch

Direct: directional , Give the message to the match routing key Queues

Topic: wildcard , Give the message to the conformity routing pattern( Routing mode ) Queues

Header:header Patterns and routing The difference is this ,header Mode cancel routingkey, Use header Medium  key/value( Key value pair ) Match the queue .

Header The pattern doesn't unfold , If you are interested, please refer to this article https://blog.csdn.net/zhu_tianwei/article/details/40923131

Exchange( Switch ) Only responsible for forwarding messages , No ability to store messages , So if there are no queues with Exchange binding , Or there is no queue that meets the routing rules , Then the news will be lost !

③Publish/subscribe( Switch type :Fanout, Also known as broadcast  )

Publish/subscribe Model diagram :

 

producer

Different from the previous two models :

  • 1) Statement Exchange, No more claims Queue

  • 2) Send a message to Exchange, No longer send to Queue

  1.  
    public class Send {
  2.  
     
  3.  
    private final static String EXCHANGE_NAME = "test_fanout_exchange";
  4.  
     
  5.  
    public static void main(String[] argv) throws Exception {
  6.  
    // Get to connection
  7.  
    Connection connection = ConnectionUtil.getConnection();
  8.  
    // Get access to
  9.  
    Channel channel = connection.createChannel();
  10.  
    // Statement exchange, The specified type is fanout
  11.  
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  12.  
     
  13.  
    // The message content
  14.  
    String message = " Registered successfully !!";
  15.  
    // Post a message to Exchange
  16.  
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  17.  
    System.out.println( " [ producer ] Sent '" + message + "'");
  18.  
     
  19.  
    channel.close();
  20.  
    connection.close();
  21.  
    }
  22.  
    }

consumer 1 ( Successfully registered and sent to SMS service )

  1.  
    public class Recv {
  2.  
    private final static String QUEUE_NAME = "fanout_exchange_queue_sms";// Message queue
  3.  
     
  4.  
    private final static String EXCHANGE_NAME = "test_fanout_exchange";
  5.  
     
  6.  
    public static void main(String[] argv) throws Exception {
  7.  
    // Get to connection
  8.  
    Connection connection = ConnectionUtil.getConnection();
  9.  
    // Get access to
  10.  
    Channel channel = connection.createChannel();
  11.  
    // Declaration queue
  12.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  13.  
     
  14.  
    // Bind queue to switch
  15.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  16.  
     
  17.  
    // Define the consumers of the queue
  18.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  19.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  20.  
    @Override
  21.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  22.  
    byte[] body) throws IOException {
  23.  
    // body The message body
  24.  
    String msg = new String(body);
  25.  
    System.out.println( " [ SMS service ] received : " + msg + "!");
  26.  
    }
  27.  
    };
  28.  
    // Listening to the queue , Auto return complete
  29.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  30.  
    }
  31.  
    }

consumer 2( Successfully registered and sent to email service )

  1.  
    public class Recv2 {
  2.  
    private final static String QUEUE_NAME = "fanout_exchange_queue_email";// Mail queue
  3.  
     
  4.  
    private final static String EXCHANGE_NAME = "test_fanout_exchange";
  5.  
     
  6.  
    public static void main(String[] argv) throws Exception {
  7.  
    // Get to connection
  8.  
    Connection connection = ConnectionUtil.getConnection();
  9.  
    // Get access to
  10.  
    Channel channel = connection.createChannel();
  11.  
    // Declaration queue
  12.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  13.  
     
  14.  
    // Bind queue to switch
  15.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  16.  
     
  17.  
    // Define the consumers of the queue
  18.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  19.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  20.  
    @Override
  21.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  22.  
    byte[] body) throws IOException {
  23.  
    // body The message body
  24.  
    String msg = new String(body);
  25.  
    System.out.println( " [ The mail service ] received : " + msg + "!");
  26.  
    }
  27.  
    };
  28.  
    // Listening to the queue , Auto return complete
  29.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  30.  
    }
  31.  
    }

  We run two consumers , And then send 1 Bar message :

reflection :

1、publish/subscribe And work queues What's the difference? .

difference :

1)work queues There's no need to define switches , and publish/subscribe Need to define switches .

2)publish/subscribe The production side of is to send messages to the switch ,work queues The producer of is queue oriented sending messages ( The bottom layer uses the default switch ).

3)publish/subscribe You need to set the binding between the queue and the switch ,work queues You don't have to set it , actually work queues The queue will be bound to the default switch .

The same thing :

So the release of both / The effect of subscription is the same , Multiple consumers listen to the same queue and will not consume messages repeatedly .

2、 For practical work  publish/subscribe still work queues.

It is recommended to use publish/subscribe, Publish subscribe mode is more powerful than work queue mode ( You can also compete in the same queue ), And publish subscribe mode can specify its own dedicated switch .

④Routing Routing model ( Switch type :direct)

Routing Model diagram :

 

P: producer , towards Exchange Send a message , When sending a message , Will specify a routing key.

X:Exchange( Switch ), Receive messages from producers , Then send the message to And routing key A perfectly matched queue

C1: consumer , Its queue specifies the need for routing key by error The news of

C2: consumer , Its queue specifies the need for routing key by info、error、warning The news of

Now let's look at the code :

producer

  1.  
    public class Send {
  2.  
    private final static String EXCHANGE_NAME = "test_direct_exchange";
  3.  
     
  4.  
    public static void main(String[] argv) throws Exception {
  5.  
    // Get to connection
  6.  
    Connection connection = ConnectionUtil.getConnection();
  7.  
    // Get access to
  8.  
    Channel channel = connection.createChannel();
  9.  
    // Statement exchange, The specified type is direct
  10.  
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  11.  
    // The message content ,
  12.  
    String message = " Registered successfully ! Please reply by SMS [T] unsubscribe ";
  13.  
    // Send a message , And specify routing key by :sms, Only SMS can receive messages
  14.  
    channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
  15.  
    System.out.println( " [x] Sent '" + message + "'");
  16.  
     
  17.  
    channel.close();
  18.  
    connection.close();
  19.  
    }
  20.  
    }

consumer 1

  1.  
    public class Recv {
  2.  
    private final static String QUEUE_NAME = "direct_exchange_queue_sms";// Message queue
  3.  
    private final static String EXCHANGE_NAME = "test_direct_exchange";
  4.  
     
  5.  
    public static void main(String[] argv) throws Exception {
  6.  
    // Get to connection
  7.  
    Connection connection = ConnectionUtil.getConnection();
  8.  
    // Get access to
  9.  
    Channel channel = connection.createChannel();
  10.  
    // Declaration queue
  11.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  12.  
     
  13.  
    // Bind queue to switch , Also specify the routing key. You can specify multiple
  14.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");// Specify the receiving sender specifies routing key by sms The news of
  15.  
    //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
  16.  
     
  17.  
    // Define the consumers of the queue
  18.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  19.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  20.  
    @Override
  21.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  22.  
    byte[] body) throws IOException {
  23.  
    // body The message body
  24.  
    String msg = new String(body);
  25.  
    System.out.println( " [ SMS service ] received : " + msg + "!");
  26.  
    }
  27.  
    };
  28.  
    // Listening to the queue , Automatically ACK
  29.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  30.  
    }
  31.  
    }

consumer 2 

  1.  
    public class Recv2 {
  2.  
    private final static String QUEUE_NAME = "direct_exchange_queue_email";// Mail queue
  3.  
    private final static String EXCHANGE_NAME = "test_direct_exchange";
  4.  
     
  5.  
    public static void main(String[] argv) throws Exception {
  6.  
    // Get to connection
  7.  
    Connection connection = ConnectionUtil.getConnection();
  8.  
    // Get access to
  9.  
    Channel channel = connection.createChannel();
  10.  
    // Declaration queue
  11.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  12.  
     
  13.  
    // Bind queue to switch , Also specify the routing key. You can specify multiple
  14.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");// Specify the receiving sender specifies routing key by email The news of
  15.  
     
  16.  
    // Define the consumers of the queue
  17.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  18.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  19.  
    @Override
  20.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  21.  
    byte[] body) throws IOException {
  22.  
    // body The message body
  23.  
    String msg = new String(body);
  24.  
    System.out.println( " [ The mail service ] received : " + msg + "!");
  25.  
    }
  26.  
    };
  27.  
    // Listening to the queue , Automatically ACK
  28.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  29.  
    }
  30.  
    }

We send sms Of RoutingKey, Find out : Only designated SMS consumers 1 I got the message

 

⑤Topics Wildcard pattern ( Switch type :topics)

Topics Model diagram :

  Each consumer listens to its own queue , And set the routingkey, The producer sent the message to broker, By switch according to routingkey To forward messages to the specified queue .

Routingkey It is usually composed of one or more words , Between many words “.” Division , for example :inform.sms

Wildcard rules :

#: Match one or more words

*: It's just that there are not many matches 1 Word

give an example :

audit.#: Able to match audit.irs.corporate  perhaps  audit.irs

audit.*: Only match audit.irs

It can be seen from the schematic diagram , We will send all the messages describing animals . The message will be used by three words ( Two points ) Composed of Routing key send out . The first word in the routing keyword will describe the speed , The second color and the third category :“<speed>.<color>.<species>”.

We created three bindings :Q1 The binding “*.orange.*”,Q2 The binding “.*.*.rabbit” and “lazy.#”.

Q1 Match all the orange animals .

Q2 Match the news about rabbits and lazy animals .

  Let's do a little exercise , If the producer sends the following message , Which queue will you enter :

quick.orange.rabbit       Q1 Q2   routingKey="quick.orange.rabbit" The message will be routed to at the same time Q1 And Q2

lazy.orange.elephant    Q1 Q2

quick.orange.fox           Q1

lazy.pink.rabbit              Q2  ( It is worth noting that , Although the routingKey And Q2 Of the two bindingKey All match , But it only delivers Q2 once )

quick.brown.fox            Does not match any queue , To be discarded

quick.orange.male.rabbit    Does not match any queue , To be discarded

orange          Does not match any queue , To be discarded

Let's specify Routing key="quick.orange.rabbit" For example , Verify the answer above

producer

  1.  
    public class Send {
  2.  
    private final static String EXCHANGE_NAME = "test_topic_exchange";
  3.  
     
  4.  
    public static void main(String[] argv) throws Exception {
  5.  
    // Get to connection
  6.  
    Connection connection = ConnectionUtil.getConnection();
  7.  
    // Get access to
  8.  
    Channel channel = connection.createChannel();
  9.  
    // Statement exchange, The specified type is topic
  10.  
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  11.  
    // The message content
  12.  
    String message = " This is a fast moving orange rabbit ";
  13.  
    // Send a message , And specify routing key by :quick.orange.rabbit
  14.  
    channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
  15.  
    System.out.println( " [ Animal description :] Sent '" + message + "'");
  16.  
     
  17.  
    channel.close();
  18.  
    connection.close();
  19.  
    }
  20.  
    }

consumer 1

  1.  
    public class Recv {
  2.  
    private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
  3.  
    private final static String EXCHANGE_NAME = "test_topic_exchange";
  4.  
     
  5.  
    public static void main(String[] argv) throws Exception {
  6.  
    // Get to connection
  7.  
    Connection connection = ConnectionUtil.getConnection();
  8.  
    // Get access to
  9.  
    Channel channel = connection.createChannel();
  10.  
    // Declaration queue
  11.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  12.  
     
  13.  
    // Bind queue to switch , Also specify the routing key. Subscribe to all the orange animals
  14.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
  15.  
     
  16.  
    // Define the consumers of the queue
  17.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  18.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  19.  
    @Override
  20.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  21.  
    byte[] body) throws IOException {
  22.  
    // body The message body
  23.  
    String msg = new String(body);
  24.  
    System.out.println( " [ consumer 1] received : " + msg + "!");
  25.  
    }
  26.  
    };
  27.  
    // Listening to the queue , Automatically ACK
  28.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  29.  
    }
  30.  
    }

  consumer 2

  1.  
    public class Recv2 {
  2.  
    private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
  3.  
    private final static String EXCHANGE_NAME = "test_topic_exchange";
  4.  
     
  5.  
    public static void main(String[] argv) throws Exception {
  6.  
    // Get to connection
  7.  
    Connection connection = ConnectionUtil.getConnection();
  8.  
    // Get access to
  9.  
    Channel channel = connection.createChannel();
  10.  
    // Declaration queue
  11.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  12.  
     
  13.  
    // Bind queue to switch , Also specify the routing key. Subscribe to news about rabbits and lazy animals
  14.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
  15.  
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
  16.  
     
  17.  
    // Define the consumers of the queue
  18.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  19.  
    // Get message , And deal with , This method is similar to event monitoring , If there is news , Will be called automatically
  20.  
    @Override
  21.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  22.  
    byte[] body) throws IOException {
  23.  
    // body The message body
  24.  
    String msg = new String(body);
  25.  
    System.out.println( " [ consumer 2] received : " + msg + "!");
  26.  
    }
  27.  
    };
  28.  
    // Listening to the queue , Automatically ACK
  29.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  30.  
    }
  31.  
    }

result C1、C2 Yes, we've all received the message :

 

 ⑥RPC

RPC Model diagram :

Basic concepts :

Callback queue The callback queue , Client sends request to server , After the server processes the request , Save its processing results in a storage . And the client in order to get the result , When the client sends a request to the server , Send a callback queue address at the same time reply_to.

Correlation id Association marks , The client may send multiple requests to the server , When the server finishes processing , The client cannot distinguish the response in the callback queue from the response in the request . To deal with this situation , When the client sends each request , At the same time, a unique correlation_id attribute , In this way, the client in the callback queue according to correlation_id The value of the field can tell which request this response belongs to .

Process description :

  • When the client starts , It creates an anonymous and exclusive callback queue .
  • stay RPC In request , The client sends a message with two properties : One is to set the callback queue reply_to attribute , The other is to set a unique value correlation_id attribute .
  • Send the request to a rpc_queue In line .
  • The server waits for requests to be sent to this queue . When the request comes up , It performs his work and sends a message with the result of the execution to reply_to The queue specified by the field .
  • The client waits for the data in the callback queue . When news comes up , It will check. correlation_id attribute . If the value of this property matches the request , Return it to the application

Share two interview questions :

Interview questions :

Avoid the accumulation of news ?

1) use workqueue, Multiple consumers listen to the same queue .

2) After receiving the message , It's through the thread pool , Asynchronous consumption .

How to avoid message loss ?

1) Consumers' ACK Mechanism . Can prevent consumers from losing messages .

however , If before consumers consume ,MQ It's down. , The news is gone ?

2) Messages can be persisted . To persist the message , Premise is : queue 、Exchange Are persistent

Switch persistence

Queue persistence

Message persistence

 

 Spring Integrate RibbitMQ

The following is the simulation of the registration service when the user successfully registered , The scenario of pushing messages to SMS and email services

build SpringBoot Environmental Science

Create two projects mq-rabbitmq-producer and mq-rabbitmq-consumer, Configure separately 1、2、3( Step 3 in this example, the consumer uses the form of annotation , You don't have to go with )

1、 add to AMQP The starter :

  1.  
    <dependency>
  2.  
    <groupId>org.springframework.boot</groupId>
  3.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  4.  
    </dependency>
  5.  
    <dependency>
  6.  
    <groupId>org.springframework.boot</groupId>
  7.  
    <artifactId>spring‐boot‐starter‐test</artifactId>
  8.  
    </dependency>

2、 stay application.yml Add RabbitMQ Configuration of :  

  1.  
    server:
  2.  
    port: 10086
  3.  
    spring:
  4.  
    application:
  5.  
    name: mq-rabbitmq-producer
  6.  
    rabbitmq:
  7.  
    host: 192.168.1.103
  8.  
    port: 5672
  9.  
    username: kavito
  10.  
    password: 123456
  11.  
    virtualHost: /kavito
  12.  
    template:
  13.  
    retry:
  14.  
    enabled: true
  15.  
    initial-interval: 10000ms
  16.  
    max-interval: 300000ms
  17.  
    multiplier: 2
  18.  
    exchange: topic.exchange
  19.  
    publisher-confirms: true

Attribute specification : 

  • template: of AmqpTemplate Configuration of

    • retry: Failure to retry

      • enabled: Open failed to try again

      • initial-interval: The interval between the first retries

      • max-interval: Maximum retry interval , You will not try again after this interval

      • multiplier: Multiple of next retry interval , Here is 2 That is to say, the next retry interval is the last time 2 times

    • exchange: Default switch name , After configuration here , Send a message if you don't specify a switch

  • publisher-confirms: Producer recognition mechanism , Make sure the message is sent correctly , If the sending fails, there will be an error receipt , This triggers a retrial

Of course, if consumer Just receive messages, not send them , You don't have to configure template Related content .  

 3、 Definition RabbitConfig Configuration class , To configure Exchange、Queue、 And binding switches .

  1.  
    @Configuration
  2.  
    public class RabbitmqConfig {
  3.  
    public static final String QUEUE_EMAIL = "queue_email";//email queue
  4.  
    public static final String QUEUE_SMS = "queue_sms";//sms queue
  5.  
    public static final String EXCHANGE_NAME="topic.exchange";//topics Type switch
  6.  
    public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
  7.  
    public static final String ROUTINGKEY_SMS="topic.#.sms.#";
  8.  
     
  9.  
    // Declaration switch
  10.  
    @Bean(EXCHANGE_NAME)
  11.  
    public Exchange exchange(){
  12.  
    //durable(true) Persistence ,mq After the restart, the switch is still
  13.  
    return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  14.  
    }
  15.  
     
  16.  
    // Statement email queue
  17.  
    /*
  18.  
    * new Queue(QUEUE_EMAIL,true,false,false)
  19.  
    * durable="true" Persistence rabbitmq There is no need to create a new queue on restart
  20.  
    * auto-delete Indicates that the message queue will be automatically deleted when it is not in use The default is false
  21.  
    * exclusive Indicates whether the message queue is only in the current connection take effect , The default is false
  22.  
    */
  23.  
    @Bean(QUEUE_EMAIL)
  24.  
    public Queue emailQueue(){
  25.  
    return new Queue(QUEUE_EMAIL);
  26.  
    }
  27.  
    // Statement sms queue
  28.  
    @Bean(QUEUE_SMS)
  29.  
    public Queue smsQueue(){
  30.  
    return new Queue(QUEUE_SMS);
  31.  
    }
  32.  
     
  33.  
    //ROUTINGKEY_EMAIL Queue bound switches , Appoint routingKey
  34.  
    @Bean
  35.  
    public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
  36.  
    @Qualifier(EXCHANGE_NAME) Exchange exchange){
  37.  
    return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
  38.  
    }
  39.  
    //ROUTINGKEY_SMS Queue bound switches , Appoint routingKey
  40.  
    @Bean
  41.  
    public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
  42.  
    @Qualifier(EXCHANGE_NAME) Exchange exchange){
  43.  
    return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
  44.  
    }
  45.  
     
  46.  
    }

producer (mq-rabbitmq-producer)

For testing purposes , I put the producer code directly into the engineering test class : send out routing key yes "topic.sms.email" The news of , that mq-rabbitmq-consumer Take those monitors ( And the switch (topic.exchange) binding , And subscribe to routingkey It matches "topic.sms.email" Regular ) The queue will receive a message .

  1.  
    @SpringBootTest
  2.  
    @RunWith(SpringRunner.class)
  3.  
    public class Send {
  4.  
     
  5.  
    @Autowired
  6.  
    RabbitTemplate rabbitTemplate;
  7.  
     
  8.  
    @Test
  9.  
    public void sendMsgByTopics(){
  10.  
     
  11.  
    /**
  12.  
    * Parameters :
  13.  
    * 1、 Switch name
  14.  
    * 2、routingKey
  15.  
    * 3、 The message content
  16.  
    */
  17.  
    for (int i=0;i<5;i++){
  18.  
    String message = " Congratulations , Registered successfully !userid="+i;
  19.  
    rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "topic.sms.email",message);
  20.  
    System.out.println( " [x] Sent '" + message + "'");
  21.  
    }
  22.  
     
  23.  
    }
  24.  
    }

Run the test class to send 5 Bar message : 

web Management interface :  You can see that the switch has been created and queue_email、queue_sms 2 A queue , And sent... To the two queues respectively 5 Bar message

 

 

consumer (mq-rabbitmq-consumer)

Write a listener component , Configuring consumer queues through annotations , And the binding relationship between the queue and the switch .( It can also be configured like a producer through configuration classes )

stay SpringAmqp in , Encapsulate and abstract the consumers of the message . One JavaBean Methods , Just add @RabbitListener annotation , You can be a consumer .

  1.  
    @Component
  2.  
    public class ReceiveHandler {
  3.  
     
  4.  
    // Listen to the mail queue
  5.  
    @RabbitListener(bindings = @QueueBinding(
  6.  
    value = @Queue(value = "queue_email", durable = "true"),
  7.  
    exchange = @Exchange(
  8.  
    value = "topic.exchange",
  9.  
    ignoreDeclarationExceptions = "true",
  10.  
    type = ExchangeTypes.TOPIC
  11.  
    ),
  12.  
    key = {"topic.#.email.#","email.*"}))
  13.  
    public void rece_email(String msg){
  14.  
    System.out.println( " [ The mail service ] received : " + msg + "!");
  15.  
    }
  16.  
     
  17.  
    // Listen to the SMS queue
  18.  
    @RabbitListener(bindings = @QueueBinding(
  19.  
    value = @Queue(value = "queue_sms", durable = "true"),
  20.  
    exchange = @Exchange(
  21.  
    value = "topic.exchange",
  22.  
    ignoreDeclarationExceptions = "true",
  23.  
    type = ExchangeTypes.TOPIC
  24.  
    ),
  25.  
    key = {"topic.#.sms.#"}))
  26.  
    public void rece_sms(String msg){
  27.  
    System.out.println( " [ SMS service ] received : " + msg + "!");
  28.  
    }
  29.  
    }

Attribute specification : 

  • @Componet: Annotations on the class , Sign up to Spring Containers

  • @RabbitListener: Comments on methods , Declare that this method is a consumer method , You need to specify the following properties :

    • bindings: Specify the binding relationship , There can be multiple . The value is @QueueBinding Array of .@QueueBinding Contains the following properties :

      • value: This consumer related queue . The value is @Queue, For a queue

      • exchange: The switch to which the queue is bound , The value is @Exchange type

      • key: The queue is bound to the switch RoutingKey, More than one... Can be specified

start-up mq-rabbitmq-comsumer project

ok, After the mail service and SMS service receive the message , You can start your own business . 

版权声明
本文为[Fengshuwan River Bridge]所创,转载请带上原文链接,感谢