当前位置:网站首页>Flink's datasource Trilogy 2: built in connector

Flink's datasource Trilogy 2: built in connector

2020-11-06 20:12:00 Programmer Xinchen

Welcome to visit mine GitHub


Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;

An overview of this article

This article is about 《Flink Of DataSource Trilogy 》 The second part of the series , Last one 《Flink Of DataSource One of the trilogy : direct API》 To study the StreamExecutionEnvironment Of API establish DataSource, What we are going to practice today is Flink Built in connector, That is, the position of the red box in the figure below , these connector Can pass StreamExecutionEnvironment Of addSource Methods use :

 Insert picture description here Today's practical choice Kafka Operate as a data source , First try to receive and process String The news of type , Then receive JSON Type of message , take JSON Antisequential formation bean example ;

Flink Of DataSource Trilogy article links

  1. 《Flink Of DataSource One of the trilogy : direct API》
  2. 《Flink Of DataSource Trilogy two : built-in connector》
  3. 《Flink Of DataSource Trilogy three : Customize 》

Source download

If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):

name link remarks
Project home page https://github.com/zq2599/blog_demos The project is in progress. GitHub Home page on
git Warehouse address (https) https://github.com/zq2599/blog_demos.git The warehouse address of the source code of the project ,https agreement
git Warehouse address (ssh) [email protected]:zq2599/blog_demos.git The warehouse address of the source code of the project ,ssh agreement

This git Multiple folders in project , The application of this chapter in flinkdatasourcedemo Under the folder , As shown in the red box below :  Insert picture description here

Environment and version

The actual combat environment and version are as follows :

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)
  6. Kafka:2.4.0
  7. Zookeeper:3.5.5

Please make sure that all of the above are ready , Only then can we continue the actual battle ;

Flink And Kafka Version match

  1. Flink Official match Kafka The version is explained in detail , The address is :https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
  2. What we should focus on is the general version mentioned by the official (universal Kafka connector ), This is from Flink1.7 Starting with , about Kafka1.0.0 Or later versions can use :

 Insert picture description here 3. In the red box below is the library my project depends on , In the blue frame is the connection Kafka Classes used , You can read according to your Kafka Version finds the appropriate libraries and classes in the table :

 Insert picture description here

Actual string message processing

  1. stay kafka Created on named test001 Of topic, Refer to the order :
./kafka-topics.sh \
--create \
--zookeeper \
--replication-factor 1 \
--partitions 2 \
--topic test001
  1. Continue to use the flinkdatasourcedemo engineering , open pom.xml File add the following dependencies :
  1. The new class Kafka240String.java, The function is to connect broker, Do... For the received string message WordCount operation :
package com.bolingcavalry.connector;

import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import static com.sun.tools.doclint.Entity.para;

public class Kafka240String {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set parallelism 

        Properties properties = new Properties();
        //broker Address 
        properties.setProperty("bootstrap.servers", "");
        //zookeeper Address 
        properties.setProperty("zookeeper.connect", "");
        // Consumers' groupId
        properties.setProperty("group.id", "flink-connector");
        // Instantiation Consumer class 
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                new SimpleStringSchema(),
        // Specify to start consumption from the latest location , It's like giving up historical news 

        // adopt addSource Method to get DataSource
        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);

        // from kafka After getting the string message , Split into words , Statistical quantity , The window is 5 second 
                .flatMap(new Splitter())

        env.execute("Connector DataSource demo : kafka");
  1. Make sure kafka Of topic Created , take Kafka240 Run up , It can be seen that the function of consuming messages and word statistics is normal :

 Insert picture description here 5. receive kafka The actual combat of string message has been completed , Next try JSON Formatted message ;

actual combat JSON Message processing

  1. The next thing to accept JSON Format message , Can be reversed into bean example , use JSON library , I chose gson;
  2. stay pom.xml increase gson rely on :
  1. Add class Student.java, This is an ordinary Bean, Only id and name Two fields :
package com.bolingcavalry;

public class Student {

    private int id;

    private String name;

    public int getId() {
        return id;

    public void setId(int id) {
        this.id = id;

    public String getName() {
        return name;

    public void setName(String name) {
        this.name = name;
  1. Add class StudentSchema.java, This category is DeserializationSchema Interface implementation , take JSON Antisequential formation Student The example uses :
ackage com.bolingcavalry.connector;

import com.bolingcavalry.Student;
import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;

public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {

    private static final Gson gson = new Gson();

     *  Deserialization , take byte The array is converted to Student example 
     * @param bytes
     * @return
     * @throws IOException
    public Student deserialize(byte[] bytes) throws IOException {
        return gson.fromJson(new String(bytes), Student.class);

    public boolean isEndOfStream(Student student) {
        return false;

     *  serialize , take Student The example turns into byte Array 
     * @param student
     * @return
    public byte[] serialize(Student student) {
        return new byte[0];

    public TypeInformation<Student> getProducedType() {
        return TypeInformation.of(Student.class);
  1. The new class Kafka240Bean.java, The function is to connect broker, Yes, I received JSON The news turned into Student example , Count the number of each name , The window is still 5 second :
package com.bolingcavalry.connector;

import com.bolingcavalry.Splitter;
import com.bolingcavalry.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class Kafka240Bean {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set parallelism 

        Properties properties = new Properties();
        //broker Address 
        properties.setProperty("bootstrap.servers", "");
        //zookeeper Address 
        properties.setProperty("zookeeper.connect", "");
        // Consumers' groupId
        properties.setProperty("group.id", "flink-connector");
        // Instantiation Consumer class 
        FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                new StudentSchema(),
        // Specify to start consumption from the latest location , It's like giving up historical news 

        // adopt addSource Method to get DataSource
        DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer);

        // from kafka Obtained JSON To be reversed into Student example , Count each one name The number of , The window is 5 second 
        dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() {
            public Tuple2<String, Integer> map(Student student) throws Exception {
                return new Tuple2<>(student.getName(), 1);

        env.execute("Connector DataSource demo : kafka bean");
  1. During the test , Want to kafka send out JSON Format string ,flink This way, we'll count out every name The number of :

 Insert picture description here thus , built-in connector The actual battle of is finished , The next chapter , We are going to work together to customize DataSource;

Welcome to the official account : Xinchen, programmer

WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos

本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢