当前位置:网站首页>Rabbit MQ message sending of vertx
Rabbit MQ message sending of vertx
2022-07-03 07:26:00 【Sleeping Empire】
Introduce
Vert.x RabbitMQ client , Allow applications to interact with RabbitMQ Agent interactive Vert.x client (AMQP 0.9.1)
1. maven Project dependence
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.lance.common</groupId>
<artifactId>vertx-common-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
2.YAML File configuration
server:
host: 127.0.0.1
port: 18005
rabbit:
host: 127.0.0.1
port: 18003
username: root
password: root
virtualHost: /root
connectionTimeout: 6000
requestedHeartbeat: 60
handshakeTimeout: 6000
requestedChannelMax: 5
networkRecoveryInterval: 500
automaticRecoveryEnabled: true
3. Start loading profile , And into the config In the middle
public class RabbitmqApplication {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
ConfigRetriever retriever = readYaml(vertx);
retriever.getConfig(json -> {
JsonObject object = json.result();
ClientHelper dbHelper = new ClientHelper(object.getJsonObject("rabbit"), vertx);
dbHelper.afterPropertiesSet();
DeploymentOptions options = new DeploymentOptions().setConfig(object);
vertx.deployVerticle(MainApp.class.getName(), options);
});
}
private static ConfigRetriever readYaml(Vertx vertx) {
ConfigStoreOptions store = new ConfigStoreOptions()
.setType("file")
.setFormat("yaml")
.setOptional(true)
.setConfig(new JsonObject().put("path", "application.yaml"));
return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(store));
}
}
4.RabbitMQ client Connection configuration
public class ClientHelper {
private final JsonObject object;
private final Vertx vertx;
@Getter
private static RabbitMQClient client;
/**
* initialization mail client Connect
*/
public void afterPropertiesSet() {
ConfigProperties.MqProperties prop = object.mapTo(ConfigProperties.MqProperties.class);
RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser(prop.getUsername());
config.setPassword(prop.getPassword());
config.setHost(prop.getHost());
config.setPort(prop.getPort());
config.setVirtualHost(prop.getVirtualHost());
config.setReconnectAttempts(prop.getReconnectAttempts());
config.setReconnectInterval(prop.getReconnectInterval());
config.setConnectionTimeout(prop.getConnectionTimeout());
config.setRequestedHeartbeat(prop.getRequestedHeartbeat());
config.setHandshakeTimeout(prop.getHandshakeTimeout());
config.setRequestedChannelMax(prop.getRequestedChannelMax());
config.setNetworkRecoveryInterval(prop.getNetworkRecoveryInterval());
config.setAutomaticRecoveryEnabled(prop.isAutomaticRecoveryEnabled());
client = RabbitMQClient.create(vertx, config);
// Connect
client.start(asyncResult -> {
if (asyncResult.succeeded()) {
consumer();
log.warn("RabbitMQ successfully connected!");
} else {
log.error("Fail to connect to RabbitMQ {}", asyncResult.cause().getMessage());
}
});
}
private void consumer() {
client.basicConsumer(MqConst.HELLO_ROUTING_KEY, result -> {
if (result.succeeded()) {
RabbitMQConsumer mqConsumer = result.result();
mqConsumer.handler(message -> {
log.info("Got message: {}", message.body().toString());
log.info("Message[exchange: {}, routeKey: {}] receive success!", message.envelope().getExchange(), message.envelope().getRoutingKey());
});
} else {
log.error("===>Queue receive fail: ", result.cause());
}
});
}
}
5.Email Send execution instance
public class UserService {
/**
* send text content
*/
public void sendMessage(RoutingContext ctx) {
UserVo userVo = ctx.getBodyAsJson().mapTo(UserVo.class);
Buffer message = Buffer.buffer(Json.encode(userVo));
ClientHelper.getClient().basicPublish(MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY, message, result -> {
if (result.succeeded()) {
log.info("Message[exchange: {}, routeKey: {}] published success!", MqConst.HELLO_EXCHANGE, MqConst.HELLO_ROUTING_KEY);
} else {
log.error("Message send fail: ", result.cause());
}
});
ctx.json(R.success("success"));
}
}
8. journal
2022-02-13 14:43:13.826 INFO 24 --- [ntloop-thread-1] com.lance.rabbit.service.UserService ---[ 30] : Message[exchange: hello_exchange, routeKey: hello_route_key] published success!
2022-02-13 14:43:13.893 INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper ---[ 67] : Got message: {"name":"Jim Green","title":"book"}
2022-02-13 14:43:13.893 INFO 19 --- [ntloop-thread-0] com.lance.rabbit.config.ClientHelper ---[ 68] : Message[exchange: hello_exchange, routeKey: hello_route_key] receive success!
7. Full address of the project
边栏推荐
- IPv4 address
- Common analysis with criteria method
- Pat grade a real problem 1166
- Wireshark software usage
- Distributed lock
- PdfWriter. GetInstance throws system Nullreferenceexception [en] pdfwriter GetInstance throws System. NullRef
- 你开发数据API最快多长时间?我1分钟就足够了
- IP home online query platform
- Use of other streams
- File operation serialization recursive copy
猜你喜欢
随机推荐
Advanced API (byte stream & buffer stream)
Visit Google homepage to display this page, which cannot be displayed
TreeMap
[solved] sqlexception: invalid value for getint() - 'Tian Peng‘
Summary of Arduino serial functions related to print read
Some basic operations of reflection
Qtip2 solves the problem of too many texts
Wireshark software usage
The babbage industrial policy forum
Final, override, polymorphism, abstraction, interface
Hash table, generic
SharePoint modification usage analysis report is more than 30 days
Leetcode 198: 打家劫舍
Industrial resilience
4279. 笛卡尔树
Common operations of JSP
FileInputStream and fileoutputstream
pgAdmin 4 v6.11 发布,PostgreSQL 开源图形化管理工具
1. E-commerce tool cefsharp autojs MySQL Alibaba cloud react C RPA automated script, open source log
Advanced API (serialization & deserialization)