当前位置:网站首页>Flink's datasource Trilogy: direct API
Flink's datasource Trilogy: direct API
2020-11-06 20:59:00 【Programmer Xinchen】
Welcome to visit mine GitHub
https://github.com/zq2599/blog_demos
Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;
This article is about 《Flink Of DataSource Trilogy 》 The first in the series , This series aims to learn and understand through actual combat Flink Of DataSource, Lay a good foundation for further study , It consists of three parts :
- direct API: This is the article. , In addition to preparing the environment and Engineering , And learned StreamExecutionEnvironment Provided to create data API;
- built-in connector:StreamExecutionEnvironment Of addSource Method , The reference can be flink Built in connector, for example kafka、RabbitMQ etc. ;
- Customize :StreamExecutionEnvironment Of addSource Method , The input parameters can be customized SourceFunction Implementation class ;
Flink Of DataSource Trilogy article links
- 《Flink Of DataSource One of the trilogy : direct API》
- 《Flink Of DataSource Trilogy two : built-in connector》
- 《Flink Of DataSource Trilogy three : Customize 》
About Flink Of DataSource
The official response to DataSource The explanation of :Sources are where your program reads its input from, namely DataSource It's the data source of the application , As shown in the two red boxes below :
DataSource type
For common text reading 、kafka、RabbitMQ Etc , You can use it directly Flink Provided API perhaps connector, If these can't meet the needs , You can also develop your own , The picture below is my own understanding :
Environment and version
Master built-in DataSource The best way is to fight , The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
Source download
If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):
name | link | remarks |
---|---|---|
Project home page | https://github.com/zq2599/blog_demos | The project is in progress. GitHub Home page on |
git Warehouse address (https) | https://github.com/zq2599/blog_demos.git | The warehouse address of the source code of the project ,https agreement |
git Warehouse address (ssh) | [email protected]:zq2599/blog_demos.git | The warehouse address of the source code of the project ,ssh agreement |
This git Multiple folders in project , The application of this chapter in <font color="blue">flinkdatasourcedemo</font> Under the folder , As shown in the red box below :
Environment and version
The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
Create a project
- Execute the following command on the console to enter creation flink Interaction mode of application , Press the prompt to enter gourpId and artifactId, It creates a flink application ( I input groupId yes <font color="blue">com.bolingcavalry</font>,artifactId yes <font color="blue">flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
- Now? maven The project has generated , use IDEA Import this project , Here's the picture :
- With maven Type import :
- Import successful look :
- Project created successfully , You can start to write code ;
Auxiliary class Splitter
There is a function commonly used to : Split the string with spaces , Turn into Tuple2 A collection of types , Let's make this operator a common class Splitter.java, The code is as follows :
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
Ready , It's time to start fighting , Start with the simplest Socket Start .
Socket DataSource
Socket DataSource The function of is to monitor the specified IP Specified port for , Read network data ;
- Create a class in the new project Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Monitor local 9999 port , Read string
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
// Every five seconds , Divide all strings in the current five seconds into spaces , Then count the number of words , Print out
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("API DataSource demo : socket");
}
}
From the above code, we can see ,StreamExecutionEnvironment.socketTextStream I can create Socket Type of DataSource, Execute the command at the console <font color="blue">nc -lk 9999</font>, Enter interactive mode , At this time, output any string and enter , Will transfer the string to the native 9999 port ;
- stay IDEA Up operation Socket class , After successful startup, go back to the execution <font color="blue">nc -lk 9999</font> In the console , Enter some strings and enter , so Socket The function of is already in effect :
aggregate DataSource(generateSequence)
- Set based DataSource,API As shown in the figure below :
2. First try the simplest generateSequence, Create a numeric... In the specified range DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
// adopt generateSequence obtain Long Type of DataSource
DataStream<Long> dataStream = env.generateSequence(1, 10);
// Do a filter , Keep only even numbers , And then print
dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute("API DataSource demo : collection");
}
}
- Even numbers will be printed at run time :
aggregate DataSource(fromElements+fromCollection)
- fromElements and fromCollection Try it in one class , establish <font color="blue">FromCollection</font> class , Inside are these two API Usage of :
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
// Create a List, There are two in it Tuple2 Elements
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
// adopt List establish DataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
// Through multiple Tuple2 Element creation DataStream
DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
new Tuple2("ccc", 1),
new Tuple2("ddd", 1),
new Tuple2("aaa", 1)
);
// adopt union Put two DataStream Synthesis of a
DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);
// Count the number of words
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : collection");
}
}
- The operation results are as follows :
file DataSource
- Below ReadTextFile Class will read the absolute path text file , And make word statistics for the content :
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadTextFile {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism to 1
env.setParallelism(1);
// use txt File as data source
DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");
// Count the number of words and print them out
textDataStream
.flatMap(new Splitter())
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : readTextFile");
}
}
- Make sure that the absolute path in your code exists under the name README.txt file , The operation results are as follows :
3. open StreamExecutionEnvironment.java Source code , Take a look at the readTextFile The method is as follows , It turns out that another method with the same name was called , The third parameter of this method determines that the text file is read once , Or periodic scanning of content changes , The fourth parameter is the interval between periodic scans :
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
- above FileProcessingMode It's an enumeration. , Source code is as follows :
@PublicEvolving
public enum FileProcessingMode {
/** Processes the current contents of the path and exits. */
PROCESS_ONCE,
/** Periodically scans the path for new data. */
PROCESS_CONTINUOUSLY
}
- Please also pay attention to <font color="blue">readTextFile</font> Methodical <font color="red">filePath</font> Parameters , This is a URI String of type , Except for the local file path , It can also be HDFS The address of :<font color="blue">hdfs://host:port/file/path</font>
thus , By direct API establish DataSource The actual battle of is finished , We will continue to learn about built-in connector The way of DataSource;
Welcome to the official account : Xinchen, programmer
WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos
版权声明
本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢
边栏推荐
- An article will take you to understand SVG gradient knowledge
- 【自学unity2d传奇游戏开发】地图编辑器
- 游戏开发中的新手引导与事件管理系统
- 2020年第四届中国 BIM (数字建造)经理高峰论坛即将在杭举办
- 大会倒计时|2020 PostgreSQL亚洲大会-中文分论坛议程安排
- ES中删除索引的mapping字段时应该考虑的点
- The importance of big data application is reflected in all aspects
- How to hide part of barcode text in barcode generation software
- 【自学unity2d传奇游戏开发】如何让角色动起来
- Shh! Is this really good for asynchronous events?
猜你喜欢
【学习】接口测试用例编写和测试关注点
A small goal in 2019 to become a blog expert of CSDN
How to hide part of barcode text in barcode generation software
Swagger 3.0 brushes the screen every day. Does it really smell good?
行为型模式之解释器模式
Building a new generation cloud native data lake with iceberg on kubernetes
代码重构之法——方法重构分析
What is alicloud's experience of sweeping goods for 100 yuan?
What are the common problems of DTU connection
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
随机推荐
Network programming NiO: Bio and NiO
How to hide part of barcode text in barcode generation software
面试官: ShardingSphere 学一下吧
Introduction to Google software testing
Introduction to the structure of PDF417 bar code system
Look! Internet, e-commerce offline big data analysis best practice! (Internet disk link attached)
嘉宾专访|2020 PostgreSQL亚洲大会阿里云数据库专场:王涛
CloudQuery V1.2.0 版本发布
检测证书过期脚本
华为云微认证考试简介
What is alicloud's experience of sweeping goods for 100 yuan?
開源一套極簡的前後端分離專案腳手架
Git rebase is in trouble. What to do? Waiting line
CCR coin frying robot: the boss of bitcoin digital currency, what you have to know
What are the criteria for selecting a cluster server?
意派Epub360丨你想要的H5模板都在这里,电子书、大转盘、红包雨、问卷调查……
An article will introduce you to HTML tables and their main attributes
Basic usage of GDB debugging
MongoDB与SQL常用语法对应表
[C] (original) step by step teach you to customize the control element - 04, ProgressBar (progress bar)