当前位置:网站首页>Hello world of rabbitmq

Hello world of rabbitmq

2020-11-08 16:10:00 Wang banxian'er's blog

pom

 

Premise is Maven engineering , use idea build

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>RabbitmqDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- RabbitMQ -->
   <dependencies>
       <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>4.2.1</version>
       </dependency>
   </dependencies>

</project>

   producer class

package Producter;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUNTING_KEY = "rountingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = " Virtual machine or windows Of ip";
    /**
     * Rabbitmq Default port 
     * */
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //  Create connection 
        Connection connection = factory.newConnection();
        //  Create channel 
        Channel channel = connection.createChannel();
        //  establish   Persistence   Switches that are not automatically deleted 
        channel.exchangeDeclare(EXCHANGE_NAME,
                "direct", true, false, null);
        //  Create persistent non exclusive , Non auto deleted queues 
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //  Switches and queues are bound by routing keys 
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY);
        // send  Persistent message  helloworld!
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        //  close resource 
        channel.close();
        connection.close();

    }
}

   consumer class

package Consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = " Same as above ";

    /**
     * Rabbitmq Default port 
     * */
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("admin");
        //  The connection is slightly different 
        //  Create connection 
        Connection connection = factory.newConnection(addresses);
        //  Create channel 
        final Channel channel = connection.createChannel();
        //  At most, the client accepts that it has not been ack The number of messages 
        channel.basicQos(64);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();

    }

}

  

版权声明
本文为[Wang banxian'er's blog]所创,转载请带上原文链接,感谢