当前位置:网站首页>15.federation
15.federation
2022-07-29 04:14:00 【Machoul】
federation和shovel
federation-exchange
问题的由来:
城市A有rabbitmqA,城市B有rabbitmqB,当城市B的应用要发消息到exchangeA的时候,会因为网络原因,导致发送时间延时。
federation-exchange的作用:
federation提供了一个能力,让城市B的mq去接收exchangeA的消息,然后再把消息转发到城市A的exchangeA
案例演示
准备两台rabbitmq服务,保证每台节点单独运行
在每台机器上开启federation相关插件
rabbitmq-plugins enable rabbitmq_federation --offline rabbitmq-plugins enable rabbitmq_federation_managemen --offline
开启后在管理台页面发现新增选项卡
运行
ConsumerFeb
的代码- 通过
ConsumerFeb
代码创建了fed-queue
和fed-exchange
/** * Feb 消息消费者 */ public class ConsumerFeb { private static final String QUEUE_NAME="fed-queue"; private static final String EXCHANGE_NAME="fed-exchange"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("172.16.140.131"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"aaa"); System.out.println("等待接收消息"); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, message) -> { String result = new String(message.getBody()); System.out.println("消费者接收到消息,消息内容为:"+result); }; //取消消费的一个回调接口 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
- 通过
Feb添加upstream
添加policy
查看federation status
添加ConsumerJan消费者代码
/** * Jan 消息消费者 */ public class ConsumerJan { private static final String QUEUE_NAME="federation-queue"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("172.16.140.130"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println("等待接收消息"); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, message) -> { String result = new String(message.getBody()); System.out.println("消费者接收到消息,消息内容为:"+result); }; //取消消费的一个回调接口 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
添加生产者代码
/** * federation-exchange-消息生产者 */ public class Producer { private static final String EXCHANGE_NAME="fed-exchange"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("172.16.140.130"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); //获取连接 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送完毕"); } }
启动消费者Feb,消费者Jan,再启动生产者
- 发现生产者发送消息到Jan,通过federation将消息转发到Feb,在消费者Feb中得到输出
federation-queue
shovel
开启插件
rabbitmq-plugins enable rabbitmq_shovel --offline rabbitmq-plugins enable rabbitmq_shovel_management --offline
边栏推荐
- 不会就坚持59天吧 替换单词
- [paper translation] vectornet: encoding HD maps and agent dynamics from vectorized representation
- Basic configuration of BGP - establish peers and route announcements
- Shielding ODBC load balancing mode in gbase 8A special scenarios?
- pat A1041 Be Unique
- Lua language (stm32+2g/4g module) and C language (stm32+esp8266) methods of extracting relevant data from strings - collation
- The table of antd hides the pager when there is only one page
- Taobao product details interface (product details page data interface)
- “蔚来杯“2022牛客暑期多校训练营1 J Serval and Essay(启发式合并)
- 不会就坚持71天吧 链表排序
猜你喜欢
C语言力扣第61题之旋转链表。双端队列与构造循环链表
rman不标记过期备份
Machine vision Series 1: Visual Studio 2019 dynamic link library DLL establishment
The principle of inverse Fourier transform (IFFT) in signal processing
Install the laser of ROS_ scan_ Problems encountered in match library (I)
Function pointer and callback function
Is the array name a pointer
Common components of solder pad (2021.4.6)
Whole house WiFi solution: mesh router networking and ac+ap
mmdetection初步使用
随机推荐
请问,在sql client中,执行insert into select from job时,如何单
信号处理中的反傅里叶变换(IFFT)原理
优炫数据库有办法查到主集群每天传给备集群的日志量吗?
The pit I walked through: the first ad Sketchpad
Do you have a boss to help me check whether the parameter configuration of the Flink SQL connection Kafka authentication Kerberos is wrong
Mmdetection preliminary use
Fuzzy query of SQL
rman不标记过期备份
Shielding ODBC load balancing mode in gbase 8A special scenarios?
What the hell is this error? It doesn't affect the execution result, but it always reports errors when executing SQL... Connecting maxcomputer uses
openFeign异步调用问题
C语言:联合体知识点总结
HCIP BGP
Openfeign asynchronous call problem
LCA 板子
After I get the winfrom specific control ID from the database, I need to find the corresponding control through this ID and assign a value to the text text of the control. What should I do
数据库SQL语句实现数据分解的函数查询
不会就坚持61天吧 最短的单词编码
Machine vision Series 1: Visual Studio 2019 dynamic link library DLL establishment
SQL server how to judge when the parameter received by the stored procedure is of type int?