当前位置:网站首页>Flink's datasource Trilogy 2: built in connector
Flink's datasource Trilogy 2: built in connector
2020-11-06 20:12:00 【Programmer Xinchen】
Welcome to visit mine GitHub
https://github.com/zq2599/blog_demos
Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;
An overview of this article
This article is about 《Flink Of DataSource Trilogy 》 The second part of the series , Last one 《Flink Of DataSource One of the trilogy : direct API》 To study the StreamExecutionEnvironment Of API establish DataSource, What we are going to practice today is Flink Built in connector, That is, the position of the red box in the figure below , these connector Can pass StreamExecutionEnvironment Of addSource Methods use :
Today's practical choice Kafka Operate as a data source , First try to receive and process String The news of type , Then receive JSON Type of message , take JSON Antisequential formation bean example ;
Flink Of DataSource Trilogy article links
- 《Flink Of DataSource One of the trilogy : direct API》
- 《Flink Of DataSource Trilogy two : built-in connector》
- 《Flink Of DataSource Trilogy three : Customize 》
Source download
If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):
name | link | remarks |
---|---|---|
Project home page | https://github.com/zq2599/blog_demos | The project is in progress. GitHub Home page on |
git Warehouse address (https) | https://github.com/zq2599/blog_demos.git | The warehouse address of the source code of the project ,https agreement |
git Warehouse address (ssh) | [email protected]:zq2599/blog_demos.git | The warehouse address of the source code of the project ,ssh agreement |
This git Multiple folders in project , The application of this chapter in flinkdatasourcedemo Under the folder , As shown in the red box below :
Environment and version
The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
- Kafka:2.4.0
- Zookeeper:3.5.5
Please make sure that all of the above are ready , Only then can we continue the actual battle ;
Flink And Kafka Version match
- Flink Official match Kafka The version is explained in detail , The address is :https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
- What we should focus on is the general version mentioned by the official (universal Kafka connector ), This is from Flink1.7 Starting with , about Kafka1.0.0 Or later versions can use :
3. In the red box below is the library my project depends on , In the blue frame is the connection Kafka Classes used , You can read according to your Kafka Version finds the appropriate libraries and classes in the table :
Actual string message processing
- stay kafka Created on named test001 Of topic, Refer to the order :
./kafka-topics.sh \
--create \
--zookeeper 192.168.50.43:2181 \
--replication-factor 1 \
--partitions 2 \
--topic test001
- Continue to use the flinkdatasourcedemo engineering , open pom.xml File add the following dependencies :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
- The new class Kafka240String.java, The function is to connect broker, Do... For the received string message WordCount operation :
package com.bolingcavalry.connector;
import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import static com.sun.tools.doclint.Entity.para;
public class Kafka240String {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(2);
Properties properties = new Properties();
//broker Address
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper Address
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
// Consumers' groupId
properties.setProperty("group.id", "flink-connector");
// Instantiation Consumer class
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new SimpleStringSchema(),
properties
);
// Specify to start consumption from the latest location , It's like giving up historical news
flinkKafkaConsumer.setStartFromLatest();
// adopt addSource Method to get DataSource
DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);
// from kafka After getting the string message , Split into words , Statistical quantity , The window is 5 second
dataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("Connector DataSource demo : kafka");
}
}
- Make sure kafka Of topic Created , take Kafka240 Run up , It can be seen that the function of consuming messages and word statistics is normal :
5. receive kafka The actual combat of string message has been completed , Next try JSON Formatted message ;
actual combat JSON Message processing
- The next thing to accept JSON Format message , Can be reversed into bean example , use JSON library , I chose gson;
- stay pom.xml increase gson rely on :
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
- Add class Student.java, This is an ordinary Bean, Only id and name Two fields :
package com.bolingcavalry;
public class Student {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
- Add class StudentSchema.java, This category is DeserializationSchema Interface implementation , take JSON Antisequential formation Student The example uses :
ackage com.bolingcavalry.connector;
import com.bolingcavalry.Student;
import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {
private static final Gson gson = new Gson();
/**
* Deserialization , take byte The array is converted to Student example
* @param bytes
* @return
* @throws IOException
*/
@Override
public Student deserialize(byte[] bytes) throws IOException {
return gson.fromJson(new String(bytes), Student.class);
}
@Override
public boolean isEndOfStream(Student student) {
return false;
}
/**
* serialize , take Student The example turns into byte Array
* @param student
* @return
*/
@Override
public byte[] serialize(Student student) {
return new byte[0];
}
@Override
public TypeInformation<Student> getProducedType() {
return TypeInformation.of(Student.class);
}
}
- The new class Kafka240Bean.java, The function is to connect broker, Yes, I received JSON The news turned into Student example , Count the number of each name , The window is still 5 second :
package com.bolingcavalry.connector;
import com.bolingcavalry.Splitter;
import com.bolingcavalry.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Kafka240Bean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(2);
Properties properties = new Properties();
//broker Address
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper Address
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
// Consumers' groupId
properties.setProperty("group.id", "flink-connector");
// Instantiation Consumer class
FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new StudentSchema(),
properties
);
// Specify to start consumption from the latest location , It's like giving up historical news
flinkKafkaConsumer.setStartFromLatest();
// adopt addSource Method to get DataSource
DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer);
// from kafka Obtained JSON To be reversed into Student example , Count each one name The number of , The window is 5 second
dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Student student) throws Exception {
return new Tuple2<>(student.getName(), 1);
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("Connector DataSource demo : kafka bean");
}
}
- During the test , Want to kafka send out JSON Format string ,flink This way, we'll count out every name The number of :
thus , built-in connector The actual battle of is finished , The next chapter , We are going to work together to customize DataSource;
Welcome to the official account : Xinchen, programmer
WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos
版权声明
本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢
边栏推荐
- 代码生成器插件与Creator预制体文件解析
- Custom function form of pychar shortcut key
- What are the criteria for selecting a cluster server?
- Live broadcast preview | micro service architecture Learning Series live broadcast phase 3
- 华为云微认证考试简介
- Interface pressure test: installation, use and instruction of siege pressure test
- Vite + TS quickly build vue3 project and introduce related features
- Using NLP and ml to extract and construct web data
- Network programming NiO: Bio and NiO
- Recommendation system based on deep learning
猜你喜欢
事务的隔离级别与所带来的问题
What course of artificial intelligence? Will it replace human work?
How to understand Python iterators and generators?
A brief history of neural networks
Shh! Is this really good for asynchronous events?
(1) ASP.NET Introduction to core3.1 Ocelot
NLP model Bert: from introduction to mastery (1)
The road of C + + Learning: from introduction to mastery
华为云微认证考试简介
前端未來趨勢之原生API:Web Components
随机推荐
Mac installation hanlp, and win installation and use
What are PLC Analog input and digital input
Free patent download tutorial (HowNet, Espacenet)
Lane change detection
前端工程师需要懂的前端面试题(c s s方面)总结(二)
大道至简 html + js 实现最朴实的小游戏俄罗斯方块
What knowledge do Python automated testing learn?
DC-1靶機
開源一套極簡的前後端分離專案腳手架
How to customize sorting for pandas dataframe
How to get started with new HTML5 (2)
Gather in Beijing! The countdown to openi 2020
Analysis of partial source codes of qthread
Analysis of query intention recognition
如何对数据库账号权限进行精细化管理?
只有1个字节的文件实际占用多少磁盘空间
With the advent of tensorflow 2.0, can pytoch still shake the status of big brother?
消息队列(MessageQueue)-分析
Network programming NiO: Bio and NiO
Wow, elasticsearch multi field weight sorting can play like this