当前位置:网站首页>Flink的DataSource三部曲之一:直接API
Flink的DataSource三部曲之一:直接API
2020-11-06 20:59:00 【程序员欣宸】
欢迎访问我的GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
本文是《Flink的DataSource三部曲》系列的第一篇,该系列旨在通过实战学习和了解Flink的DataSource,为以后的深入学习打好基础,由以下三部分组成:
- 直接API:即本篇,除了准备环境和工程,还学习了StreamExecutionEnvironment提供的用来创建数据来的API;
- 内置connector:StreamExecutionEnvironment的addSource方法,入参可以是flink内置的connector,例如kafka、RabbitMQ等;
- 自定义:StreamExecutionEnvironment的addSource方法,入参可以是自定义的SourceFunction实现类;
Flink的DataSource三部曲文章链接
关于Flink的DataSource
官方对DataSource的解释:Sources are where your program reads its input from,即DataSource是应用的数据来源,如下图的两个红框所示:
DataSource类型
对于常见的文本读入、kafka、RabbitMQ等数据来源,可以直接使用Flink提供的API或者connector,如果这些满足不了需求,还可以自己开发,下图是我按照自己的理解梳理的:
环境和版本
熟练掌握内置DataSource的最好办法就是实战,本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
源码下载
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | [email protected]:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在<font color="blue">flinkdatasourcedemo</font>文件夹下,如下图红框所示:
环境和版本
本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
创建工程
- 在控制台执行以下命令就会进入创建flink应用的交互模式,按提示输入gourpId和artifactId,就会创建一个flink应用(我输入的groupId是<font color="blue">com.bolingcavalry</font>,artifactId是<font color="blue">flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
- 现在maven工程已生成,用IDEA导入这个工程,如下图:
- 以maven的类型导入:
- 导入成功的样子:
- 项目创建成功,可以开始写代码实战了;
辅助类Splitter
实战中有个功能常用到:将字符串用空格分割,转成Tuple2类型的集合,这里将此算子做成一个公共类Splitter.java,代码如下:
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
准备完毕,可以开始实战了,先从最简单的Socket开始。
Socket DataSource
Socket DataSource的功能是监听指定IP的指定端口,读取网络数据;
- 在刚才新建的工程中创建一个类Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//监听本地9999端口,读取字符串
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
//每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("API DataSource demo : socket");
}
}
从上述代码可见,StreamExecutionEnvironment.socketTextStream就可以创建Socket类型的DataSource,在控制台执行命令<font color="blue">nc -lk 9999</font>,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机9999端口;
- 在IDEA上运行Socket类,启动成功后再回到刚才执行<font color="blue">nc -lk 9999</font>的控制台,输入一些字符串再回车,可见Socket的功能已经生效:
集合DataSource(generateSequence)
- 基于集合的DataSource,API如下图所示:
2. 先试试最简单的generateSequence,创建指定范围内的数字型的DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1
env.setParallelism(1);
//通过generateSequence得到Long类型的DataSource
DataStream<Long> dataStream = env.generateSequence(1, 10);
//做一次过滤,只保留偶数,然后打印
dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute("API DataSource demo : collection");
}
}
- 运行时会打印偶数:
集合DataSource(fromElements+fromCollection)
- fromElements和fromCollection就在一个类中试了吧,创建<font color="blue">FromCollection</font>类,里面是这两个API的用法:
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1
env.setParallelism(1);
//创建一个List,里面有两个Tuple2元素
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
//通过List创建DataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
//通过多个Tuple2元素创建DataStream
DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
new Tuple2("ccc", 1),
new Tuple2("ddd", 1),
new Tuple2("aaa", 1)
);
//通过union将两个DataStream合成一个
DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);
//统计每个单词的数量
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : collection");
}
}
- 运行结果如下:
文件DataSource
- 下面的ReadTextFile类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadTextFile {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为1
env.setParallelism(1);
//用txt文件作为数据源
DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");
//统计单词数量并打印出来
textDataStream
.flatMap(new Splitter())
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : readTextFile");
}
}
- 请确保代码中的绝对路径下存在名为README.txt文件,运行结果如下:
3. 打开StreamExecutionEnvironment.java源码,看一下刚才使用的readTextFile方法实现如下,原来是调用了另一个同名方法,该方法的第三个参数确定了文本文件是一次性读取完毕,还是周期性扫描内容变更,而第四个参数就是周期性扫描的间隔时间:
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
- 上面的FileProcessingMode是个枚举,源码如下:
@PublicEvolving
public enum FileProcessingMode {
/** Processes the current contents of the path and exits. */
PROCESS_ONCE,
/** Periodically scans the path for new data. */
PROCESS_CONTINUOUSLY
}
- 另外请关注<font color="blue">readTextFile</font>方法的<font color="red">filePath</font>参数,这是个URI类型的字符串,除了本地文件路径,还可以是HDFS的地址:<font color="blue">hdfs://host:port/file/path</font>
至此,通过直接API创建DataSource的实战就完成了,后面的章节我们继续学习内置connector方式的DataSource;
欢迎关注公众号:程序员欣宸
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界... https://github.com/zq2599/blog_demos
版权声明
本文为[程序员欣宸]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4198380/blog/4703661
边栏推荐
- Not long after graduation, he earned 20000 yuan from private work!
- If PPT is drawn like this, can the defense of work report be passed?
- EOS创始人BM: UE,UBI,URI有什么区别?
- Arrangement of basic knowledge points
- 零基础打造一款属于自己的网页搜索引擎
- 一篇文章教会你使用HTML5 SVG 标签
- ES6学习笔记(四):教你轻松搞懂ES6的新增语法
- axios学习笔记(二):轻松弄懂XHR的使用及如何封装简易axios
- Vue 3 responsive Foundation
- 6.5 request to view name translator (in-depth analysis of SSM and project practice)
猜你喜欢
Vue 3 responsive Foundation
100元扫货阿里云是怎样的体验?
I'm afraid that the spread sequence calculation of arbitrage strategy is not as simple as you think
Thoughts on interview of Ali CCO project team
零基础打造一款属于自己的网页搜索引擎
Not long after graduation, he earned 20000 yuan from private work!
NLP model Bert: from introduction to mastery (1)
前端基础牢记的一些操作-Github仓库管理
vue-codemirror基本用法:实现搜索功能、代码折叠功能、获取编辑器值及时验证
Do not understand UML class diagram? Take a look at this edition of rural love class diagram, a learn!
随机推荐
Brief introduction and advantages and disadvantages of deepwalk model
加速「全民直播」洪流,如何攻克延时、卡顿、高并发难题?
钻石标准--Diamond Standard
Tool class under JUC package, its name is locksupport! Did you make it?
Common algorithm interview has been out! Machine learning algorithm interview - KDnuggets
前端都应懂的入门基础-github基础
PHP应用对接Justswap专用开发包【JustSwap.PHP】
6.5 request to view name translator (in-depth analysis of SSM and project practice)
Windows 10 tensorflow (2) regression analysis of principles, deep learning framework (gradient descent method to solve regression parameters)
IPFS/Filecoin合法性:保护个人隐私不被泄露
Arrangement of basic knowledge points
Not long after graduation, he earned 20000 yuan from private work!
Filecoin最新动态 完成重大升级 已实现四大项目进展!
一篇文章教会你使用Python网络爬虫下载酷狗音乐
ES6学习笔记(五):轻松了解ES6的内置扩展对象
Vuejs development specification
零基础打造一款属于自己的网页搜索引擎
Do not understand UML class diagram? Take a look at this edition of rural love class diagram, a learn!
Mongodb (from 0 to 1), 11 days mongodb primary to intermediate advanced secret
What is the side effect free method? How to name it? - Mario