当前位置:网站首页>Sent by mqtt client server of vertx
Sent by mqtt client server of vertx
2022-07-03 07:25:00 【Sleeping Empire】
Introduce
Vert.x MQTT message sending , Client and server
1. maven Project dependence
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.lance.common</groupId>
<artifactId>vertx-common-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
2.YAML File configuration
server:
host: 127.0.0.1
port: 18003
3.MQTT Server configuration
public class MqttServerApp extends AbstractVerticle {
private final static String CLIENT_ID = "clientHello";
@Override
public void start(Promise<Void> startPromise) throws Exception {
ConfigProperties properties = config().mapTo(ConfigProperties.class);
int port = properties.getServer().getPort();
log.info("===>json: {}, port: {}", properties, port);
MqttServer mqttServer = MqttServer.create(vertx, create(properties));
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
log.info("MQTT client [{}] request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
if (endpoint.auth() != null) {
log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
}
log.info("[properties = {}]", endpoint.connectProperties());
if (endpoint.will() != null) {
log.info("[will topic: {}, msg: {}, QoS: {}, isRetain: {}]", endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(), endpoint.will().getWillQos(), endpoint.will().isWillRetain());
}
log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
// accept connection from the remote client
endpoint.accept(true);
receiver(endpoint);
endpoint.disconnectMessageHandler(disconnectMessage -> log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()));
})
.exceptionHandler(t -> log.error("MQTT exception fail: ", t))
.listen(ar -> {
if (ar.succeeded()) {
log.warn("MQTT server is listening on port: {}", ar.result().actualPort());
} else {
log.error("Fail on starting the server: ", ar.cause());
}
});
}
private void receiver(MqttEndpoint endpoint) {
endpoint.publishHandler(p -> {
log.info("Server received message [{}] with QoS [{}]", p.payload().toString(Charset.defaultCharset()), p.qosLevel());
if (p.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(p.messageId());
} else if (p.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(p.messageId());
}
send(endpoint);
})
.publishReleaseHandler(endpoint::publishComplete);
}
private void send(MqttEndpoint endpoint) {
Buffer payload = Buffer.buffer("server: hello world.");
endpoint.publish(MqttClientApp.MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Server publish success: {}", s.result());
} else {
log.error("===>Server publish fail: ", s.cause());
}
});
}
private MqttServerOptions create(ConfigProperties configProperties) {
MqttServerOptions options = new MqttServerOptions();
options.setPort(configProperties.getServer().getPort());
options.setHost(configProperties.getServer().getHost());
return options;
}
}
4.MQTT Client configuration
public class MqttClientApp extends AbstractVerticle {
public static final String MQTT_TOPIC = "hello_topic";
@Override
public void start() {
MqttClient client = MqttClient.create(vertx, create());
// handler will be called when we have a message in topic we subscribe for
client.publishHandler(p -> {
log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
});
client.connect(18003, "127.0.0.1", s -> {
if (s.succeeded()) {
log.info("Client connect success.");
subscribe(client);
} else {
log.error("Client connect fail: ", s.cause());
}
}).exceptionHandler(event -> {
log.error("client fail: ", event.getCause());
});
}
private void subscribe(MqttClient client) {
client.subscribe(MQTT_TOPIC, 0, e -> {
if (e.succeeded()) {
log.info("===>subscribe success: {}", e.result());
vertx.setPeriodic(10_000, l -> publish(client));
} else {
log.error("===>subscribe fail: ", e.cause());
}
});
}
private void publish(MqttClient client) {
Buffer payload = Buffer.buffer("client: hello world.");
client.publish(MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Client publish success: {}", s.result());
} else {
log.error("===>Client publish fail: ", s.cause());
}
});
}
private MqttClientOptions create() {
MqttClientOptions options = new MqttClientOptions();
options.setClientId("ClientId_" + RandomStringUtils.randomAlphanumeric(6));
options.setMaxMessageSize(100_000_000);
options.setKeepAliveInterval(2);
return options;
}
}
5. result
2022-01-25 19:06:53.244 WARN 21 --- [ntloop-thread-1] lver.dns.DnsServerAddressStreamProviders---[ 70] : Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
2022-01-25 19:06:53.291 INFO 21 --- [ntloop-thread-1] io.vertx.mqtt.impl.MqttClientImpl ---[ ] : Connection with 127.0.0.1:18003 established successfully
2022-01-25 19:06:53.432 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 34] : Client connect success.
2022-01-25 19:06:53.512 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 47] : ===>subscribe success: 1
2022-01-25 19:07:03.537 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 2
2022-01-25 19:07:03.551 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:13.518 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 3
2022-01-25 19:07:13.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:23.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 4
6. Full address of the project
边栏推荐
猜你喜欢

691. 立方体IV

【CoppeliaSim4.3】C#调用 remoteApi控制场景中UR5

691. Cube IV
![[solved] sqlexception: invalid value for getint() - 'Tian Peng‘](/img/bf/f6310304d58d964b3d09a9d011ddb5.png)
[solved] sqlexception: invalid value for getint() - 'Tian Peng‘

TCP cumulative acknowledgement and window value update

1. E-commerce tool cefsharp autojs MySQL Alibaba cloud react C RPA automated script, open source log

在 4EVERLAND 上存储 WordPress 媒体内容,完成去中心化存储

《指环王:力量之戒》新剧照 力量之戒铸造者亮相

VMWare网络模式-桥接,Host-Only,NAT网络

FileInputStream and fileoutputstream
随机推荐
Margin left: -100% understanding in the Grail layout
pgAdmin 4 v6.11 发布,PostgreSQL 开源图形化管理工具
OSI knowledge sorting
Chrome 98 Private Network Access problem w/ disabled web security: Request had no target IP address
3311. 最长算术
论文学习——鄱阳湖星子站水位时间序列相似度研究
Visit Google homepage to display this page, which cannot be displayed
Read config configuration file of vertx
The embodiment of generics in inheritance and wildcards
Use of other streams
II. D3.js draw a simple figure -- circle
Beginners use Minio
C代码生产YUV420 planar格式文件
Dora (discover offer request recognition) process of obtaining IP address
Advanced APL (realize group chat room)
Recursion, Fibonacci sequence
树莓派更新工具链
Distributed ID
Advanced API (UDP connection & map set & collection set)
“百度杯”CTF比赛 2017 二月场,Web:爆破-1