当前位置:网站首页>死信队列入门(两个消费者,一个生产者)

死信队列入门(两个消费者,一个生产者)

2022-07-05 20:11:00 为什么不好好卖蛋饼

整理一下死信队列。

无法被消费的消息

死信来源

消息TTL过期
队列达到最大长度

队列满了,无法再添加消息到mq

消息被拒绝

50 死信代码架构图

在这里插入图片描述

直接交换机 zhangsan 被C1消费

死信交换机 lisi 添加到dead-queue 被C2消费

51 c1 消费者

public class Consumer01{
    
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";
    public static void main() throws Exception{
    
        
        Channel channel=RabbitMqUtil.getChannel();

        //声明普通交换机 /死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        //声明普通队列
        Map<String ,Object> arguments=new HashMap<>();
        //过期时间
        //arguments.put("x-message-ttl",1000000);
        //正常队列设置过期之后的死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    	channel.queueDeclare(NORMAL_QUEUE,false,false,false,null);
        
        //死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定普通交换机和队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        
        //绑定死信交换机和死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        sout("等待接收消息");
        //回调函数
        DeliverCallback deliverCallback=(consumerTag,message)->{
    
            sout("Consumer01接收的消息时"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumeTag->{
    });    
    }
}

52 生产者

public class Producer{
    
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static void main() throws Exception{
    
        Channel channel=RabbitMaUtils.getChannel();
        //死信消息
AMQP.BasicProperties=new AMQP.BasicProperties().builder().expration("10000").build();
        
        //死信消息 设置ttl时间
        for(int i=1;i<11;i++){
    
            String message="info"+i;
    channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
		}
    }
}

停掉消费者,消息转发到死信队列

53 消费者2

这个简单,就是单纯的接收普通队列转发来的消息进行消费。

public class Consumer02{
    
   
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";
    public static void main() throws Exception{
    
        Channel channel=RabbitMqUtil.getChannel();
        sout("等待接收消息");
        
        DeliverCallback deliverCallback=(consumerTag,message)->{
    
            sout("Consumer02接收的消息时"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumeTag->{
    });    
    }
}
原网站

版权声明
本文为[为什么不好好卖蛋饼]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_42938698/article/details/125587107