当前位置:网站首页>15.federation
15.federation
2022-07-29 04:15:00 【Machoul】
federation and shovel
federation-exchange
The origin of the problem :
City A Yes rabbitmqA, City B Yes rabbitmqB, When the city B Your app will send a message to exchangeA When , Because of the Internet , It will delay the sending time .
federation-exchange The role of :
federation Provides a capability , Let the city B Of mq To receive exchangeA The news of , Then forward the message to the city A Of exchangeA

Case presentation
Prepare two rabbitmq service , Ensure that each node operates independently
Turn on... On each machine federation Related to the plug-in
rabbitmq-plugins enable rabbitmq_federation --offline rabbitmq-plugins enable rabbitmq_federation_managemen --offlineAfter opening, find the new tab on the management console page

function
ConsumerFebCode for- adopt
ConsumerFebThe code createsfed-queueandfed-exchange
/** * Feb Message consumer */ public class ConsumerFeb { private static final String QUEUE_NAME="fed-queue"; private static final String EXCHANGE_NAME="fed-exchange"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("172.16.140.131"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"aaa"); System.out.println(" Waiting to receive message "); // The interface callback of how to consume the pushed message DeliverCallback deliverCallback = (consumerTag, message) -> { String result = new String(message.getBody()); System.out.println(" Consumer receives message , The message is :"+result); }; // A callback interface for canceling consumption CancelCallback cancelCallback = consumerTag -> { System.out.println(" Message consumption is interrupted "); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }- adopt
Feb add to upstream

add to policy

see federation status

add to ConsumerJan Consumer code
/** * Jan Message consumer */ public class ConsumerJan { private static final String QUEUE_NAME="federation-queue"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("172.16.140.130"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println(" Waiting to receive message "); // The interface callback of how to consume the pushed message DeliverCallback deliverCallback = (consumerTag, message) -> { String result = new String(message.getBody()); System.out.println(" Consumer receives message , The message is :"+result); }; // A callback interface for canceling consumption CancelCallback cancelCallback = consumerTag -> { System.out.println(" Message consumption is interrupted "); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }Add producer code
/** * federation-exchange- Message producer */ public class Producer { private static final String EXCHANGE_NAME="fed-exchange"; public static void main(String[] args) throws Exception { // Create a connection factory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("172.16.140.130"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); // Get the connection Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" Message sent "); } }Start consumer Feb, consumer Jan, Restart producer
- Discovery producer sends message to Jan, adopt federation Forward the message to Feb, In consumer Feb You get the output from
federation-queue

shovel
Open plug-in
rabbitmq-plugins enable rabbitmq_shovel --offline rabbitmq-plugins enable rabbitmq_shovel_management --offline

边栏推荐
- Svg -- loading animation
- Codeforces Round #810 (Div. 2) D. Rain (线段树差分)
- "Weilai Cup" 2022 Niuke summer multi school training camp 2H
- Communication between parent-child components and parent-child components provide and inject
- Solution: module 'xlrd' has no attribute 'open_ Error reporting of workbook '
- The difference between dynamic, VaR and object in fluent
- rman不标记过期备份
- [kvm] install KVM
- SQL server how to judge when the parameter received by the stored procedure is of type int?
- 这个报错是什么鬼啊,不影响执行结果,但是在执行sql时一直报错。。。连接maxComputer是使用
猜你喜欢

STM32F103ZET6程序移植为C8T6+C8T6下载程序flash timeout的解决方案

MySQL gets the maximum value record by field grouping

全屋WiFi方案:Mesh路由器组网和AC+AP

Value transmission and address transmission of C language, pointer of pointer

基于STM32和阿里云的环境检测系统设计

Problems encountered in vscode connection SSH

Lua language (stm32+2g/4g module) and C language (stm32+esp8266) methods of extracting relevant data from strings - collation

10.回退消息

Beginner: array & String

不会就坚持65天吧 只出现一次的数字
随机推荐
不会就坚持62天吧 单词之和
Multi rotor six axis hardware selection
Is the array name a pointer
[kvm] create virtual machine from kickstart file
不会就坚持70天吧 数组中第k大的数
Blood cases caused by < meta charset=UTF-8> -- Analysis of common character codes
编译与链接
Const char* and char*, string constants
The data source is SQL server. I want to configure the incremental data of the last two days of the date field updatedate to add
Value transmission and address transmission of C language, pointer of pointer
Why are there so many unknowns when opengauss starts?
AssertionError(“Torch not compiled with CUDA enabled“)
12.优先级队列和惰性队列
Object detection: object_ Detection API +ssd target detection model
SQL server当存储过程接收的参数是int类型时,如何做判断?
Codeforces Round #810 (Div. 2) D. Rain (线段树差分)
不会就坚持61天吧 最短的单词编码
9.延迟队列
不会就坚持71天吧 链表排序
C语言力扣第61题之旋转链表。双端队列与构造循环链表