当前位置:网站首页>Sent by mqtt client server of vertx
Sent by mqtt client server of vertx
2022-07-03 07:25:00 【Sleeping Empire】
Introduce
Vert.x MQTT message sending , Client and server
1. maven Project dependence
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.lance.common</groupId>
<artifactId>vertx-common-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
2.YAML File configuration
server:
host: 127.0.0.1
port: 18003
3.MQTT Server configuration
public class MqttServerApp extends AbstractVerticle {
private final static String CLIENT_ID = "clientHello";
@Override
public void start(Promise<Void> startPromise) throws Exception {
ConfigProperties properties = config().mapTo(ConfigProperties.class);
int port = properties.getServer().getPort();
log.info("===>json: {}, port: {}", properties, port);
MqttServer mqttServer = MqttServer.create(vertx, create(properties));
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
log.info("MQTT client [{}] request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
if (endpoint.auth() != null) {
log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
}
log.info("[properties = {}]", endpoint.connectProperties());
if (endpoint.will() != null) {
log.info("[will topic: {}, msg: {}, QoS: {}, isRetain: {}]", endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(), endpoint.will().getWillQos(), endpoint.will().isWillRetain());
}
log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
// accept connection from the remote client
endpoint.accept(true);
receiver(endpoint);
endpoint.disconnectMessageHandler(disconnectMessage -> log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()));
})
.exceptionHandler(t -> log.error("MQTT exception fail: ", t))
.listen(ar -> {
if (ar.succeeded()) {
log.warn("MQTT server is listening on port: {}", ar.result().actualPort());
} else {
log.error("Fail on starting the server: ", ar.cause());
}
});
}
private void receiver(MqttEndpoint endpoint) {
endpoint.publishHandler(p -> {
log.info("Server received message [{}] with QoS [{}]", p.payload().toString(Charset.defaultCharset()), p.qosLevel());
if (p.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(p.messageId());
} else if (p.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(p.messageId());
}
send(endpoint);
})
.publishReleaseHandler(endpoint::publishComplete);
}
private void send(MqttEndpoint endpoint) {
Buffer payload = Buffer.buffer("server: hello world.");
endpoint.publish(MqttClientApp.MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Server publish success: {}", s.result());
} else {
log.error("===>Server publish fail: ", s.cause());
}
});
}
private MqttServerOptions create(ConfigProperties configProperties) {
MqttServerOptions options = new MqttServerOptions();
options.setPort(configProperties.getServer().getPort());
options.setHost(configProperties.getServer().getHost());
return options;
}
}
4.MQTT Client configuration
public class MqttClientApp extends AbstractVerticle {
public static final String MQTT_TOPIC = "hello_topic";
@Override
public void start() {
MqttClient client = MqttClient.create(vertx, create());
// handler will be called when we have a message in topic we subscribe for
client.publishHandler(p -> {
log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
});
client.connect(18003, "127.0.0.1", s -> {
if (s.succeeded()) {
log.info("Client connect success.");
subscribe(client);
} else {
log.error("Client connect fail: ", s.cause());
}
}).exceptionHandler(event -> {
log.error("client fail: ", event.getCause());
});
}
private void subscribe(MqttClient client) {
client.subscribe(MQTT_TOPIC, 0, e -> {
if (e.succeeded()) {
log.info("===>subscribe success: {}", e.result());
vertx.setPeriodic(10_000, l -> publish(client));
} else {
log.error("===>subscribe fail: ", e.cause());
}
});
}
private void publish(MqttClient client) {
Buffer payload = Buffer.buffer("client: hello world.");
client.publish(MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Client publish success: {}", s.result());
} else {
log.error("===>Client publish fail: ", s.cause());
}
});
}
private MqttClientOptions create() {
MqttClientOptions options = new MqttClientOptions();
options.setClientId("ClientId_" + RandomStringUtils.randomAlphanumeric(6));
options.setMaxMessageSize(100_000_000);
options.setKeepAliveInterval(2);
return options;
}
}
5. result
2022-01-25 19:06:53.244 WARN 21 --- [ntloop-thread-1] lver.dns.DnsServerAddressStreamProviders---[ 70] : Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
2022-01-25 19:06:53.291 INFO 21 --- [ntloop-thread-1] io.vertx.mqtt.impl.MqttClientImpl ---[ ] : Connection with 127.0.0.1:18003 established successfully
2022-01-25 19:06:53.432 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 34] : Client connect success.
2022-01-25 19:06:53.512 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 47] : ===>subscribe success: 1
2022-01-25 19:07:03.537 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 2
2022-01-25 19:07:03.551 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:13.518 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 3
2022-01-25 19:07:13.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:23.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 4
6. Full address of the project
边栏推荐
- 691. Cube IV
- C代码生产YUV420 planar格式文件
- Use of file class
- Visit Google homepage to display this page, which cannot be displayed
- Distributed lock
- Read config configuration file of vertx
- 2021-07-18
- Introduction of transformation flow
- Advanced API (UDP connection & map set & collection set)
- Store WordPress media content on 4everland to complete decentralized storage
猜你喜欢

Store WordPress media content on 4everland to complete decentralized storage

最全SQL与NoSQL优缺点对比

VMware network mode - bridge, host only, NAT network

File operation serialization recursive copy

4279. Cartesian tree

Common methods of file class

7.2 brush two questions

Interview questions about producers and consumers (important)
![[solved] unknown error 1146](/img/f1/b8dd3ca8359ac9eb19e1911bd3790a.png)
[solved] unknown error 1146

Spa single page application
随机推荐
【开发笔记】基于机智云4G转接板GC211的设备上云APP控制
Understanding of class
Interfaces and related concepts
Take you through the whole process and comprehensively understand the software accidents that belong to testing
The difference between typescript let and VaR
Thoughts on project development
Jeecg request URL signature
VMware virtual machine installation
Strategy mode
Advanced API (serialization & deserialization)
Chrome 98 Private Network Access problem w/ disabled web security: Request had no target IP address
【CoppeliaSim4.3】C#调用 remoteApi控制场景中UR5
Advanced APL (realize group chat room)
The education of a value investor
New stills of Lord of the rings: the ring of strength: the caster of the ring of strength appears
docker建立mysql:5.7版本指定路径挂载不上。
Longest common prefix and
Common problems in io streams
Advanced API (character stream & net for beginners)
II. D3.js draw a simple figure -- circle