当前位置:网站首页>Use of message queues
Use of message queues
2022-06-27 12:19:00 【Jinlin】
3.1 RabbitMQ At the heart of :
There are introductions on the core official website , Yes connecnton,channel And so on. , What's going on ,who care?
overall , Our focus on business implementation is :1) How was the message delivered .2) How do consumers consume news .3) Whether the message is delivered reliably .4) Message delivery method .5) The life cycle of a message .6) Message queue lifecycle
3.2 How the message was delivered ?( Remember a little , Producer message delivery is switch oriented )
RabbitMQ It is aimed at delivering messages to the switch . The switch may be bound with many queues , How does the switch deliver messages to these queues ?
First, let's talk about the advantages of the switch design :1) This obviously relies on the design idea of the switch at the data link layer . In addition to the clear hierarchy , It can also improve link utilization ( It may be a little like ).
2) At the code level : If there is no switch , You must at least maintain a very large routing table , Then the message is delivered correctly from the routing table , With the interactive machine , This way
The table will be split into multiple switches , The effect is needless to say .
3) Then there is a high degree of decoupling , Different switches can have different routing rules , Without a switch ......
stay RabbitMQ, The switch has 4 Two delivery methods , Enumeration class BuiltinExchangeType Of 4 Enumeration variables :
DIRECT: All messages will be retrieved first ROUTE_KEY, And then deliver it to ROUTE_KEY In the bound queue (if(msg.routekey.equals(queue.routekey))).
FANOUT: In this mode , Don't check messages at all ROUTE_KEY, Directly deliver to all queues owned by the switch .
TOPIC,HEADERS Take a look at what the official website says , I don't want to code anymore ^_^||
To sum up, a function sends the message :channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes()); You can check this on the official website API
3.3 How do consumers consume news ( Remember a little , Consumer consumption messages are message queue oriented , This is a little different from the generator )
Not yet TCP Those things that connect the heartbeat for a long time , That's the one API:channel.basicConsume(QUEUE_AUTODELETE, true, consumer);consumer yes Consumer An instance of a class ,
You can handle the callback interface directly ok 了
3.4 Whether the message delivery is reliable
It's obviously reliable , Unless you queue messages , Declare as a non persistent pattern , You restart the machine again . This will lose the message . And he has a response mechanism , You can set the mode of consumer consumption messages ,
Go to answer manually .channel.basicConsume(?,autoACk,?) Of autoAck Parameter setting
3.5 The life cycle of a message
Once the consumer answers , The identity message has been consumed , The message is recycled .
3.6 Queue lifecycle
channel.queueDeclare(QUEUE_NAME,false,false,true,null);
The second parameter is set to true, Will persist messages to disk , The fourth parameter is set to true It means that if there is no message and no connection, the queue will be deleted , You can check the details API
Four 、 An example
4.1 Producer code :
Import dependent packages or dependencies by yourself
Copy code
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);// Note that the port here is different from that of the management plug-in
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Make a statement dirent Mode switch
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
// Declare a non persistent auto delete queue
channel.queueDeclare("queue_name",false,false,true,null);// Delete the queue if it is no longer in use zhe
// Bind to switch
channel.queueBind("queue_name","exchange_name","route_key");
// Declare a message header
Map<String,Object> header=new HashMap<>();
AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder();
header.put("charset","utf-8");
b.headers(header);
AMQP.BasicProperties bp=b.build();
// Send the message
channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
Copy code
4.2 Consumer code
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);// Note that the port here is different from that of the management plug-in
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Make a statement dirent Mode switch
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
// Declare a non persistent auto delete queue
channel.queueDeclare("queue_name",false,false,true,null);// Delete the queue if it is no longer in use zhe
// Bind to switch
channel.queueBind("queue_name","exchange_name","route_key");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume("queue_name", true, consumer);
边栏推荐
- 怎么找相同台词的影视片段?这8个电影搜索神器,一句台词找到对应片段
- 记一次 .NET 某物管后台服务 卡死分析
- pull request
- R语言使用epiDisplay包的followup.plot函数可视化多个ID(病例)监测指标的纵向随访图、使用stress.labels参数在可视化图像中为强调线添加标签信息
- Deep understanding of happens before principle
- Don't miss it. New media operates 15 treasure official account to share
- 秒云荣获《2022爱分析 · IT运维厂商全景报告》智能运维AIOps市场代表厂商
- 深入理解 happens-before 原则
- What's the matter with Amazon's evaluation dropping and failing to stay? How to deal with it?
- How histrix works
猜你喜欢

Nvme2.0 protocol - new features

Topic37——64. Minimum path sum

Interview shock 60: what will cause MySQL index invalidation?

怎么找相同台词的影视片段?这8个电影搜索神器,一句台词找到对应片段

MapReduce原理剖析(深入源码)

StarCraft's Bug King ia retired for 2 years to engage in AI, and lamented that it was inferior

Interviewer: with the for loop, why do you need foreach?

nifi从入门到实战(保姆级教程)——身份认证

Drive to APasS! Use Mingdao cloud to manage F1 events

Histrix工作原理
随机推荐
微服务拆分
R语言使用epiDisplay包的poisgof函数对泊松回归(Poisson Regression)执行拟合优度检验、检验是否存在过度离散问题(overdispersion)
56. Core principle of flutter - flutter startup process and rendering pipeline
解压 .log.gz 文件
聊聊 Go 语言与云原生技术
Research Report on the overall scale, major manufacturers, major regions, products and application segments of hydraulic torque in the global market in 2022
Topic38——56. 合并区间
居家办公被催之后才明白的时间管理
1. Mx6ull startup mode
Nvme2.0 protocol - new features
TiDB 6.0:让 TSO 更高效丨TiDB Book Rush
The R language uses the follow up The plot function visualizes the longitudinal follow-up map of multiple ID (case) monitoring indicators, and uses stress The labels parameter adds label information t
MapReduce实战小案例(自定义排序、二次排序、分组、分区)
Microservice splitting
R language uses the poisgof function of epidisplay package to test the goodness of fit of Poisson regression and whether there is overdispersion
Private dry goods sharing: how to implement platform in Enterprise Architecture
How to adjust an integer that is entered in Excel but always displays decimals?
Drive to APasS!使用明道云管理F1赛事
Neo4j:入门基础(一)之安装与使用
The DBSCAN function of FPC package in R language performs density clustering analysis on data, and the plot function visualizes the clustering graph