当前位置:网站首页>Rabbit MQ message sending of vertx
Rabbit MQ message sending of vertx
2022-07-05 01:46:00 【Xiong Da believed what Xiong er said】
Introduce
Vert.x RabbitMQ client , Allow applications to interact with RabbitMQ Agent interactive Vert.x client (AMQP 0.9.1)
1. maven Project dependence
<dependencies> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-rabbitmq-client</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-config-yaml</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>com.lance.common</groupId> <artifactId>vertx-common-core</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency></dependencies>2.YAML File configuration
server: host: 127.0.0.1 port: 18005rabbit: host: 127.0.0.1 port: 18003 username: root password: root virtualHost: /root connectionTimeout: 6000 requestedHeartbeat: 60 handshakeTimeout: 6000 requestedChannelMax: 5 networkRecoveryInterval: 500 automaticRecoveryEnabled: true3. Start loading profile , And into the config In the middle
public class RabbitmqApplication { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); ConfigRetriever retriever = readYaml(vertx); retriever.getConfig(json -> { JsonObject object = json.result(); ClientHelper dbHelper = new ClientHelper(object.getJsonObject("rabbit"), vertx); dbHelper.afterPropertiesSet(); DeploymentOptions options = new DeploymentOptions().setConfig(object); vertx.deployVerticle(MainApp.class.getName(), options); }); } private static ConfigRetriever readYaml(Vertx vertx) { ConfigStoreOptions store = new ConfigStoreOptions() .setType("file") .setFormat("yaml") .setOptional(true) .setConfig(new JsonObject().put("path", "application.yaml")); return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(store)); }}4.RabbitMQ client Connection configuration
public class ClientHelper { private final JsonObject object; private final Vertx vertx; @Getter private static RabbitMQClient client; /** * initialization mail client Connect */ public void afterPropertiesSet() { ConfigProperties.MqProperties prop = object.mapTo(ConfigProperties.MqProperties.class); RabbitMQOptions config = new RabbitMQOptions(); // Each parameter is optional // The default parameter with be used if the parameter is not set config.setUser(prop.getUsername()); config.setPassword(prop.getPassword()); config.setHost(prop.getHost()); config.setPort(prop.getPort()); config.setVirtualHost(prop.getVirtualHost()); config.setReconnectAttempts(prop.getReconnectAttempts()); config.setReconnectInterval(prop.getReconnectInterval()); config.setConnectionTimeout(prop.getConnectionTimeout()); config.setRequestedHeartbeat(prop.getRequestedHeartbeat()); config.setHandshakeTimeout(prop.getHandshakeTimeout()); config.setRequestedChannelMax(prop.getRequestedChannelMax()); config.setNetworkRecoveryInterval(prop.getNetworkRecoveryInterval()); config.setAutomaticRecoveryEnabled(prop.isAutomaticRecoveryEnabled()); client = RabbitMQClient.create(vertx, config); // Connect client.start(asyncResult -> { if (asyncResult.succeeded()) { consumer(); log.warn("RabbitMQ successfully connected!"); } else { log.error("Fail to connect to RabbitMQ {}", asyncResult.cause().getMessage()); } }); } private void consumer() { client.basicConsumer(MqConst.HELLO_ROUTING_KEY, result -> { if (result.succeeded()) { RabbitMQConsumer mqConsumer = result.result(); mqConsumer.handler(message -> { log.info("Got message: {}", message.body().toString()); log.info("Message[exchange: {}, routeKey: {}] receive success!", message.envelope().getExchange(), message.envelope().getRoutingKey()); }); } else { log.error("===>Queue receive fail: ", result.cause()); } }); }}5.Email Send execution instance
public class UserService { /** * send text content */ public void sendMessage(RoutingContext ctx) { UserVo userVo = ctx.getBodyAsJson().mapTo(UserVo.class); Buffer message = Buffer.buffer(Json.encode(userVo)); ClientHelper.getClient().basicPublish(MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY, message, result -> { if (result.succeeded()) { log.info("Message[exchange: {}, routeKey: {}] published success!", MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY); } else { log.error("Message send fail: ", result.cause()); } }); ctx.json(R.success("success")); }}6. journal
2022-02-13 14:43:13.826 INFO 24 --- [ntloop-thread-1] com.lance.rabbit.service.UserService ---[ 30] : Message[exchange: hello_exchange, routeKey: hello_route_key] published success!2022-02-13 14:43:13.893 INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper ---[ 67] : Got message: {"name":"Jim Green","title":"book"}2022-02-13 14:43:13.893 INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper ---[ 68] : Message[exchange: hello_exchange, routeKey: hello_route_key] receive success!7. Full address of the project
边栏推荐
- [Chongqing Guangdong education] National Open University spring 2019 1042 international economic law reference questions
- Complex, complicated and numerous: illustration of seven types of code coupling
- MySQL REGEXP:正则表达式查询
- pytorch fine-tuning (funtune) : 镂空设计or 偷梁换柱
- Yyds dry goods inventory kubernetes management business configuration methods? (08)
- Introduction to redis (1)
- If the consumer Internet is compared to a "Lake", the industrial Internet is a vast "ocean"
- Include rake tasks in Gems - including rake tasks in gems
- Win: enable and disable USB drives using group policy
- Do you know the eight signs of a team becoming agile?
猜你喜欢

Exploration and Practice of Stream Batch Integration in JD

Wechat applet; Gibberish generator
![[swagger]-swagger learning](/img/60/1dbe074b3c66687867192b0817b553.jpg)
[swagger]-swagger learning

How to build a technical team that will bring down the company?

Logstash、Fluentd、Fluent Bit、Vector? How to choose the appropriate open source log collector

Practice of tdengine in TCL air conditioning energy management platform

Visual explanation of Newton iteration method

Application and Optimization Practice of redis in vivo push platform

Wechat applet: wechat applet source code download new community system optimized version support agent member system function super high income

runc hang 导致 Kubernetes 节点 NotReady
随机推荐
Learn tla+ (XII) -- functions through examples
Great God developed the new H5 version of arXiv, saying goodbye to formula typography errors in one step, and mobile phones can also easily read literature
Huawei machine test question: longest continuous subsequence
Abacus mental arithmetic test
Common bit operation skills of C speech
Wechat applet: independent background with distribution function, Yuelao office blind box for making friends
Mysql database | build master-slave instances of mysql-8.0 or above based on docker
Database postragesql client authentication
JS implementation determines whether the point is within the polygon range
JVM - when multiple threads initialize the same class, only one thread is allowed to initialize
PHP wechat official account development
Application and Optimization Practice of redis in vivo push platform
PowerShell:在代理服务器后面使用 PowerShell
Wechat applet: the latest WordPress black gold wallpaper wechat applet two open repair version source code download support traffic main revenue
The server time zone value ‘� й ��� ʱ 'is unrecognized or representatives more than one time zone【
MySQL backup and recovery + experiment
Redis master-slave replication cluster and recovery ideas for abnormal data loss # yyds dry goods inventory #
node工程中package.json文件作用是什么?里面的^尖括号和~波浪号是什么意思?
Win: enable and disable USB drives using group policy
The application and Optimization Practice of redis in vivo push platform is transferred to the end of metadata by