当前位置:网站首页>Sent by mqtt client server of vertx
Sent by mqtt client server of vertx
2022-07-03 07:25:00 【Sleeping Empire】
Introduce
Vert.x MQTT message sending , Client and server
1. maven Project dependence
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.lance.common</groupId>
<artifactId>vertx-common-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
2.YAML File configuration
server:
host: 127.0.0.1
port: 18003
3.MQTT Server configuration
public class MqttServerApp extends AbstractVerticle {
private final static String CLIENT_ID = "clientHello";
@Override
public void start(Promise<Void> startPromise) throws Exception {
ConfigProperties properties = config().mapTo(ConfigProperties.class);
int port = properties.getServer().getPort();
log.info("===>json: {}, port: {}", properties, port);
MqttServer mqttServer = MqttServer.create(vertx, create(properties));
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
log.info("MQTT client [{}] request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
if (endpoint.auth() != null) {
log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
}
log.info("[properties = {}]", endpoint.connectProperties());
if (endpoint.will() != null) {
log.info("[will topic: {}, msg: {}, QoS: {}, isRetain: {}]", endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(), endpoint.will().getWillQos(), endpoint.will().isWillRetain());
}
log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
// accept connection from the remote client
endpoint.accept(true);
receiver(endpoint);
endpoint.disconnectMessageHandler(disconnectMessage -> log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()));
})
.exceptionHandler(t -> log.error("MQTT exception fail: ", t))
.listen(ar -> {
if (ar.succeeded()) {
log.warn("MQTT server is listening on port: {}", ar.result().actualPort());
} else {
log.error("Fail on starting the server: ", ar.cause());
}
});
}
private void receiver(MqttEndpoint endpoint) {
endpoint.publishHandler(p -> {
log.info("Server received message [{}] with QoS [{}]", p.payload().toString(Charset.defaultCharset()), p.qosLevel());
if (p.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(p.messageId());
} else if (p.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(p.messageId());
}
send(endpoint);
})
.publishReleaseHandler(endpoint::publishComplete);
}
private void send(MqttEndpoint endpoint) {
Buffer payload = Buffer.buffer("server: hello world.");
endpoint.publish(MqttClientApp.MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Server publish success: {}", s.result());
} else {
log.error("===>Server publish fail: ", s.cause());
}
});
}
private MqttServerOptions create(ConfigProperties configProperties) {
MqttServerOptions options = new MqttServerOptions();
options.setPort(configProperties.getServer().getPort());
options.setHost(configProperties.getServer().getHost());
return options;
}
}
4.MQTT Client configuration
public class MqttClientApp extends AbstractVerticle {
public static final String MQTT_TOPIC = "hello_topic";
@Override
public void start() {
MqttClient client = MqttClient.create(vertx, create());
// handler will be called when we have a message in topic we subscribe for
client.publishHandler(p -> {
log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
});
client.connect(18003, "127.0.0.1", s -> {
if (s.succeeded()) {
log.info("Client connect success.");
subscribe(client);
} else {
log.error("Client connect fail: ", s.cause());
}
}).exceptionHandler(event -> {
log.error("client fail: ", event.getCause());
});
}
private void subscribe(MqttClient client) {
client.subscribe(MQTT_TOPIC, 0, e -> {
if (e.succeeded()) {
log.info("===>subscribe success: {}", e.result());
vertx.setPeriodic(10_000, l -> publish(client));
} else {
log.error("===>subscribe fail: ", e.cause());
}
});
}
private void publish(MqttClient client) {
Buffer payload = Buffer.buffer("client: hello world.");
client.publish(MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Client publish success: {}", s.result());
} else {
log.error("===>Client publish fail: ", s.cause());
}
});
}
private MqttClientOptions create() {
MqttClientOptions options = new MqttClientOptions();
options.setClientId("ClientId_" + RandomStringUtils.randomAlphanumeric(6));
options.setMaxMessageSize(100_000_000);
options.setKeepAliveInterval(2);
return options;
}
}
5. result
2022-01-25 19:06:53.244 WARN 21 --- [ntloop-thread-1] lver.dns.DnsServerAddressStreamProviders---[ 70] : Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
2022-01-25 19:06:53.291 INFO 21 --- [ntloop-thread-1] io.vertx.mqtt.impl.MqttClientImpl ---[ ] : Connection with 127.0.0.1:18003 established successfully
2022-01-25 19:06:53.432 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 34] : Client connect success.
2022-01-25 19:06:53.512 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 47] : ===>subscribe success: 1
2022-01-25 19:07:03.537 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 2
2022-01-25 19:07:03.551 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:13.518 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 3
2022-01-25 19:07:13.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:23.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 4
6. Full address of the project
边栏推荐
- Advanced API (UDP connection & map set & collection set)
- 【CMake】CMake链接SQLite库
- Why is data service the direction of the next generation data center?
- Split small interface
- Interfaces and related concepts
- Strategy mode
- JUC forkjoinpool branch merge framework - work theft
- Specified interval inversion in the linked list
- pgAdmin 4 v6.11 发布,PostgreSQL 开源图形化管理工具
- 带你全流程,全方位的了解属于测试的软件事故
猜你喜欢
SecureCRT取消Session记录的密码
Leetcode 213: 打家劫舍 II
【已解决】SQLException: Invalid value for getInt() - ‘田鹏‘
How long is the fastest time you can develop data API? One minute is enough for me
Final, override, polymorphism, abstraction, interface
Dora (discover offer request recognition) process of obtaining IP address
7.2刷题两个
Spa single page application
Interview questions about producers and consumers (important)
Understanding of class
随机推荐
高并发内存池
Advanced API (UDP connection & map set & collection set)
High concurrency memory pool
Beginners use Minio
IP home online query platform
Common problems in io streams
IO stream system and FileReader, filewriter
[untitled]
7.2 brush two questions
2. E-commerce tool cefsharp autojs MySQL Alibaba cloud react C RPA automated script, open source log
Common APIs
Raspberry pie update tool chain
TypeScript let与var的区别
Various postures of CS without online line
Some experiences of Arduino soft serial port communication
SecureCRT取消Session记录的密码
LeetCode
你开发数据API最快多长时间?我1分钟就足够了
Topic | synchronous asynchronous
Jeecg menu path display problem