当前位置:网站首页>Flink的DataSource三部曲之二:内置connector
Flink的DataSource三部曲之二:内置connector
2020-11-06 20:12:00 【程序员欣宸】
欢迎访问我的GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
本篇概览
本文是《Flink的DataSource三部曲》系列的第二篇,上一篇《Flink的DataSource三部曲之一:直接API》学习了StreamExecutionEnvironment的API创建DataSource,今天要练习的是Flink内置的connector,即下图的红框位置,这些connector可以通过StreamExecutionEnvironment的addSource方法使用:
今天的实战选择Kafka作为数据源来操作,先尝试接收和处理String型的消息,再接收JSON类型的消息,将JSON反序列化成bean实例;
Flink的DataSource三部曲文章链接
源码下载
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | [email protected]:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:
环境和版本
本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
- Kafka:2.4.0
- Zookeeper:3.5.5
请确保上述内容都已经准备就绪,才能继续后面的实战;
Flink与Kafka版本匹配
- Flink官方对匹配Kafka版本做了详细说明,地址是:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
- 要重点关注的是官方提到的通用版(universal Kafka connector ),这是从Flink1.7开始推出的,对于Kafka1.0.0或者更高版本都可以使用:
3. 下图红框中是我的工程中要依赖的库,蓝框中是连接Kafka用到的类,读者您可以根据自己的Kafka版本在表格中找到适合的库和类:
实战字符串消息处理
- 在kafka上创建名为test001的topic,参考命令:
./kafka-topics.sh \
--create \
--zookeeper 192.168.50.43:2181 \
--replication-factor 1 \
--partitions 2 \
--topic test001
- 继续使用上一章创建的flinkdatasourcedemo工程,打开pom.xml文件增加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
- 新增类Kafka240String.java,作用是连接broker,对收到的字符串消息做WordCount操作:
package com.bolingcavalry.connector;
import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import static com.sun.tools.doclint.Entity.para;
public class Kafka240String {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(2);
Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消费者的groupId
properties.setProperty("group.id", "flink-connector");
//实例化Consumer类
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new SimpleStringSchema(),
properties
);
//指定从最新位置开始消费,相当于放弃历史消息
flinkKafkaConsumer.setStartFromLatest();
//通过addSource方法得到DataSource
DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);
//从kafka取得字符串消息后,分割成单词,统计数量,窗口是5秒
dataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("Connector DataSource demo : kafka");
}
}
- 确保kafka的topic已经创建,将Kafka240运行起来,可见消费消息并进行单词统计的功能是正常的:
5. 接收kafka字符串消息的实战已经完成,接下来试试JSON格式的消息;
实战JSON消息处理
- 接下来要接受的JSON格式消息,可以被反序列化成bean实例,会用到JSON库,我选择的是gson;
- 在pom.xml增加gson依赖:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
- 增加类Student.java,这是个普通的Bean,只有id和name两个字段:
package com.bolingcavalry;
public class Student {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
- 增加类StudentSchema.java,该类是DeserializationSchema接口的实现,将JSON反序列化成Student实例时用到:
ackage com.bolingcavalry.connector;
import com.bolingcavalry.Student;
import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {
private static final Gson gson = new Gson();
/**
* 反序列化,将byte数组转成Student实例
* @param bytes
* @return
* @throws IOException
*/
@Override
public Student deserialize(byte[] bytes) throws IOException {
return gson.fromJson(new String(bytes), Student.class);
}
@Override
public boolean isEndOfStream(Student student) {
return false;
}
/**
* 序列化,将Student实例转成byte数组
* @param student
* @return
*/
@Override
public byte[] serialize(Student student) {
return new byte[0];
}
@Override
public TypeInformation<Student> getProducedType() {
return TypeInformation.of(Student.class);
}
}
- 新增类Kafka240Bean.java,作用是连接broker,对收到的JSON消息转成Student实例,统计每个名字出现的数量,窗口依旧是5秒:
package com.bolingcavalry.connector;
import com.bolingcavalry.Splitter;
import com.bolingcavalry.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Kafka240Bean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(2);
Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消费者的groupId
properties.setProperty("group.id", "flink-connector");
//实例化Consumer类
FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new StudentSchema(),
properties
);
//指定从最新位置开始消费,相当于放弃历史消息
flinkKafkaConsumer.setStartFromLatest();
//通过addSource方法得到DataSource
DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer);
//从kafka取得的JSON被反序列化成Student实例,统计每个name的数量,窗口是5秒
dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Student student) throws Exception {
return new Tuple2<>(student.getName(), 1);
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("Connector DataSource demo : kafka bean");
}
}
- 在测试的时候,要向kafka发送JSON格式字符串,flink这边就会给统计出每个name的数量:
至此,内置connector的实战就完成了,接下来的章节,我们将要一起实战自定义DataSource;
欢迎关注公众号:程序员欣宸
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界... https://github.com/zq2599/blog_demos
版权声明
本文为[程序员欣宸]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4198380/blog/4705334
边栏推荐
- In depth understanding of the construction of Intelligent Recommendation System
- 安装Anaconda3 后,怎样使用 Python 2.7?
- iptables基礎原理和使用簡介
- 选择站群服务器的有哪些标准呢?
- How to demote a domain controller in Windows Server 2012 and later
- 網路程式設計NIO:BIO和NIO
- Elasticsearch 第六篇:聚合統計查詢
- Azure Data Factory(三)整合 Azure Devops 實現CI/CD
- TensorFlow2.0 问世,Pytorch还能否撼动老大哥地位?
- Every day we say we need to do performance optimization. What are we optimizing?
猜你喜欢
Anomaly detection method based on SVM
GUI 引擎评价指标
windows10 tensorflow(二)原理实战之回归分析,深度学习框架(梯度下降法求解回归参数)
Technical director, to just graduated programmers a word - do a good job in small things, can achieve great things
DevOps是什么
谁说Cat不能做链路跟踪的,给我站出来
Pycharm快捷键 自定义功能形式
哇,ElasticSearch多字段权重排序居然可以这么玩
通过深层神经网络生成音乐
mac 下常用快捷键,mac启动ftp
随机推荐
Electron应用使用electron-builder配合electron-updater实现自动更新
哇,ElasticSearch多字段权重排序居然可以这么玩
Kitty中的动态线程池支持Nacos,Apollo多配置中心了
利用 AWS SageMaker BlazingText 对不均衡文本进行多分类
mac 安装hanlp,以及win下安装与使用
Using tensorflow to forecast the rental price of airbnb in New York City
nlp模型-bert从入门到精通(二)
50 + open source projects are officially assembled, and millions of developers are voting
如果前端不使用SPA又能怎样?- Hacker News
直播预告 | 微服务架构学习系列直播第三期
7.3.2 File Download & big file download
用Python构建和可视化决策树
(1)ASP.NET Core3.1 Ocelot介紹
drf JWT認證模組與自定製
被老程式設計師壓榨怎麼辦?我不想辭職
文本去重的技术方案讨论(一)
【快速因數分解】Pollard's Rho 演算法
快快使用ModelArts,零基礎小白也能玩轉AI!
大数据应用的重要性体现在方方面面
向北京集结!OpenI/O 2020启智开发者大会进入倒计时