当前位置:网站首页>Mqtt+flink to subscribe and publish real-time messages
Mqtt+flink to subscribe and publish real-time messages
2022-06-23 08:50:00 【//Continuous margin_ documentary】
List of articles
One 、 add to MQTT Dependency package
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
Two 、Mqtt Configuration class MqttConfig.java
import java.io.Serializable;
// This class needs to implement serialization, so it must implement Serializable Interface
public class MqttConfig implements Serializable {
public MqttConfig(String username, String password, String hostUrl, String clientId, String msgTopic) {
this.username = username;
this.password = password;
this.hostUrl = hostUrl;
this.clientId = clientId;
this.msgTopic = msgTopic;
}
// Connection name
private String username;
// Connect the password
private String password;
//ip Address and port number
private String hostUrl;
// The server ID Be careful not to duplicate other connections , Otherwise, the connection will fail
private String clientId;
// Subscribed topics
private String msgTopic;
// Get username
public String getUsername() {
return username;
}
// Get the password
public String getPassword() {
return password;
}
// Get the client id
public String getClientId() {
return clientId;
}
// Get the server url
public String getHostUrl() {
return hostUrl;
}
// Get subscription
public String[] getMsgTopic() {
String[] topic = msgTopic.split(",");
return topic;
}
}
3、 ... and 、Mqtt Implementation class of MqttConsumer.java
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** * MQTT Client subscription message class * @author zhongyulin * */
public class MqttConsumer extends RichParallelSourceFunction<String>{
// Storage service
private static MqttClient client;
// Store subscription topics
private static MqttTopic mqttTopic;
// The blocking queue stores subscribed messages
private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
// Method of packaging connection
private void connect() throws MqttException {
// Configure connection parameters
MqttConfig mqttConfigBean = new MqttConfig(" user name ", " password ", "tcp:// The server ip:1883", "mqtt", " Customize ");
// Connect mqtt The server
client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(mqttConfigBean.getUsername());
options.setPassword(mqttConfigBean.getPassword().toCharArray());
options.setCleanSession(false); // Whether to remove session
// Set timeout
options.setConnectionTimeout(30);
// Set session heartbeat time
options.setKeepAliveInterval(20);
try {
String[] msgtopic = mqttConfigBean.getMsgTopic();
// Subscribe to news
int[] qos = new int[msgtopic.length];
for (int i = 0; i < msgtopic.length; i++) {
qos[i] = 0;
}
client.setCallback(new MsgCallback(client, options, msgtopic, qos){
});
client.connect(options);
client.subscribe(msgtopic, qos);
System.out.println("MQTT Successful connection :" + mqttConfigBean.getClientId() + ":" + client);
} catch (Exception e) {
System.out.println("MQTT Abnormal connection :" + e);
}
}
// Realization MqttCallback, Internal functions can call back
class MsgCallback implements MqttCallback{
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MsgCallback() {
}
public MsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
// The connection failed to call back the function
@Override
public void connectionLost(Throwable throwable) {
System.out.println("MQTT Connection is broken , Launch a reconnection ");
while (true) {
try {
Thread.sleep(1000);
client.connect(options);
// Subscribe to news
client.subscribe(topic, qos);
System.out.println("MQTT Reconnection successful :" + client);
break;
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
// The function is called back when a message is received
@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
System.out.println();
// Subscription message character
String msg = new String(message.getPayload());
byte[] bymsg = getBytesFromObject(msg);
System.out.println("topic:" + topic);
queue.put(msg);
}
// Object to bytecode
public byte[] getBytesFromObject(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
//flink Thread start function
@Override
public void run(final SourceContext<String> ctx) throws Exception {
connect();
// Use the loop to keep the program monitoring whether there are new messages in the topic
while (true){
// The advantage of using blocking queue is that when the queue is empty, the program will block until here without wasting CPU resources
ctx.collect(queue.take());
}
}
@Override
public void cancel() {
}
/** * Subscribe to a topic * * @param topic * @param qos */
public void subscribe(String topic, int qos) {
try {
System.out.println("topic:" + topic);
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public MqttClient getClient() {
return client;
}
public void setClient(MqttClient client) {
this.client = client;
}
public MqttTopic getMqttTopic() {
return mqttTopic;
}
public void setMqttTopic(MqttTopic mqttTopic) {
this.mqttTopic = mqttTopic;
}
}
Four 、Flink Work start class
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.*;
public class StreamingJob {
/** * Skeleton for a Flink Streaming Job. * * <p>For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>. * * <p>To package your appliation into a JAR file for execution, run * 'mvn clean package' on the command line. * * <p>If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Receive messages and convert data into streams
DataStream<String> stream = env.addSource(new MqttConsumer());
// Transform the form of a stream through a function
DataStream<Tuple2<String, String>> dataStream = stream.flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {
out.collect(Tuple2.of(str[0], str[1]));
}
}).setParallelism(1);// Number of threads in parallel
dataStream.print();
env.execute();
}
}
边栏推荐
- Deep analysis and Simulation of vector
- 173. Binary Search Tree Iterator
- 6月《中國數據庫行業分析報告》發布!智能風起,列存更生
- Use newbeecoder UI implements data paging
- 65. Valid Number
- Unity grid programming 06
- The results of CDN node and source station are inconsistent
- 词性家族
- How to sort a dictionary by value or key?
- Object. Defineproperty() and data broker
猜你喜欢

986. Interval List Intersections

Basic use of check boxes and implementation of select all and invert selection functions

Self organizing map neural network (SOM)

Hongmeng reads the resource file

高通9x07两种启动模式

3. caller service call - dapr

谈谈 @Autowired 的实现原理

173. Binary Search Tree Iterator

Testing -- automated testing selenium (about API)

测试-- 自动化测试selenium(关于API)
随机推荐
Install a WGet for your win10
Lighthouse cloud desktop experience
Unity grid programming 06
Intelligent operation and maintenance exploration | anomaly detection method in cloud system
[qnx hypervisor 2.2 user manual]6.2 network
Subsets II of leetcode topic analysis
Use newbeecoder UI implements data paging
Interpretation of the most dirty technology in history, I can understand 60 it terms in seconds
What exactly is RT?
Leetcode topic analysis spiral matrix II
Basic use of check boxes and implementation of select all and invert selection functions
类型从属名称的使用必须以“typename”为前缀
Isomorphic strings for leetcode topic resolution
vector的深度剖析及模拟实现
XSS via host header
How to sort a dictionary by value or key?
测试-- 自动化测试selenium(关于API)
usb peripheral 驱动 - configfs
Can portals be the next decentraland?
[qnx hypervisor 2.2 user manual]6.1 using the QNX hypervisor system