当前位置:网站首页>Flink's datasource Trilogy: direct API
Flink's datasource Trilogy: direct API
2020-11-06 20:59:00 【Programmer Xinchen】
Welcome to visit mine GitHub
https://github.com/zq2599/blog_demos
Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;
This article is about 《Flink Of DataSource Trilogy 》 The first in the series , This series aims to learn and understand through actual combat Flink Of DataSource, Lay a good foundation for further study , It consists of three parts :
- direct API: This is the article. , In addition to preparing the environment and Engineering , And learned StreamExecutionEnvironment Provided to create data API;
- built-in connector:StreamExecutionEnvironment Of addSource Method , The reference can be flink Built in connector, for example kafka、RabbitMQ etc. ;
- Customize :StreamExecutionEnvironment Of addSource Method , The input parameters can be customized SourceFunction Implementation class ;
Flink Of DataSource Trilogy article links
- 《Flink Of DataSource One of the trilogy : direct API》
- 《Flink Of DataSource Trilogy two : built-in connector》
- 《Flink Of DataSource Trilogy three : Customize 》
About Flink Of DataSource
The official response to DataSource The explanation of :Sources are where your program reads its input from, namely DataSource It's the data source of the application , As shown in the two red boxes below : 
DataSource type
For common text reading 、kafka、RabbitMQ Etc , You can use it directly Flink Provided API perhaps connector, If these can't meet the needs , You can also develop your own , The picture below is my own understanding : 
Environment and version
Master built-in DataSource The best way is to fight , The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
Source download
If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):
| name | link | remarks |
|---|---|---|
| Project home page | https://github.com/zq2599/blog_demos | The project is in progress. GitHub Home page on |
| git Warehouse address (https) | https://github.com/zq2599/blog_demos.git | The warehouse address of the source code of the project ,https agreement |
| git Warehouse address (ssh) | [email protected]:zq2599/blog_demos.git | The warehouse address of the source code of the project ,ssh agreement |
This git Multiple folders in project , The application of this chapter in <font color="blue">flinkdatasourcedemo</font> Under the folder , As shown in the red box below : 
Environment and version
The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
Create a project
- Execute the following command on the console to enter creation flink Interaction mode of application , Press the prompt to enter gourpId and artifactId, It creates a flink application ( I input groupId yes <font color="blue">com.bolingcavalry</font>,artifactId yes <font color="blue">flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
- Now? maven The project has generated , use IDEA Import this project , Here's the picture :

- With maven Type import :

- Import successful look :

- Project created successfully , You can start to write code ;
Auxiliary class Splitter
There is a function commonly used to : Split the string with spaces , Turn into Tuple2 A collection of types , Let's make this operator a common class Splitter.java, The code is as follows :
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
Ready , It's time to start fighting , Start with the simplest Socket Start .
Socket DataSource
Socket DataSource The function of is to monitor the specified IP Specified port for , Read network data ;
- Create a class in the new project Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Monitor local 9999 port , Read string
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
// Every five seconds , Divide all strings in the current five seconds into spaces , Then count the number of words , Print out
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("API DataSource demo : socket");
}
}
From the above code, we can see ,StreamExecutionEnvironment.socketTextStream I can create Socket Type of DataSource, Execute the command at the console <font color="blue">nc -lk 9999</font>, Enter interactive mode , At this time, output any string and enter , Will transfer the string to the native 9999 port ;
- stay IDEA Up operation Socket class , After successful startup, go back to the execution <font color="blue">nc -lk 9999</font> In the console , Enter some strings and enter , so Socket The function of is already in effect :

aggregate DataSource(generateSequence)
- Set based DataSource,API As shown in the figure below :
2. First try the simplest generateSequence, Create a numeric... In the specified range DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
// adopt generateSequence obtain Long Type of DataSource
DataStream<Long> dataStream = env.generateSequence(1, 10);
// Do a filter , Keep only even numbers , And then print
dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute("API DataSource demo : collection");
}
}
- Even numbers will be printed at run time :

aggregate DataSource(fromElements+fromCollection)
- fromElements and fromCollection Try it in one class , establish <font color="blue">FromCollection</font> class , Inside are these two API Usage of :
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
// Create a List, There are two in it Tuple2 Elements
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
// adopt List establish DataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
// Through multiple Tuple2 Element creation DataStream
DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
new Tuple2("ccc", 1),
new Tuple2("ddd", 1),
new Tuple2("aaa", 1)
);
// adopt union Put two DataStream Synthesis of a
DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);
// Count the number of words
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : collection");
}
}
- The operation results are as follows :

file DataSource
- Below ReadTextFile Class will read the absolute path text file , And make word statistics for the content :
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadTextFile {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism to 1
env.setParallelism(1);
// use txt File as data source
DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");
// Count the number of words and print them out
textDataStream
.flatMap(new Splitter())
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : readTextFile");
}
}
- Make sure that the absolute path in your code exists under the name README.txt file , The operation results are as follows :
3. open StreamExecutionEnvironment.java Source code , Take a look at the readTextFile The method is as follows , It turns out that another method with the same name was called , The third parameter of this method determines that the text file is read once , Or periodic scanning of content changes , The fourth parameter is the interval between periodic scans :
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
- above FileProcessingMode It's an enumeration. , Source code is as follows :
@PublicEvolving
public enum FileProcessingMode {
/** Processes the current contents of the path and exits. */
PROCESS_ONCE,
/** Periodically scans the path for new data. */
PROCESS_CONTINUOUSLY
}
- Please also pay attention to <font color="blue">readTextFile</font> Methodical <font color="red">filePath</font> Parameters , This is a URI String of type , Except for the local file path , It can also be HDFS The address of :<font color="blue">hdfs://host:port/file/path</font>
thus , By direct API establish DataSource The actual battle of is finished , We will continue to learn about built-in connector The way of DataSource;
Welcome to the official account : Xinchen, programmer
WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos
版权声明
本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢
边栏推荐
- Details of dapr implementing distributed stateful service
- 统计项目代码行数
- Description of phpshe SMS plug-in
- Digital city responds to relevant national policies and vigorously develops the construction of digital twin platform
- 2020年数据库技术大会助力技术提升
- The AI method put forward by China has more and more influence. Tianda et al. Mined the development law of AI from a large number of literatures
- How to get started with new HTML5 (2)
- 事件监听问题
- python100例項
- ES6 learning notes (4): easy to understand the new grammar of ES6
猜你喜欢

Behind the record breaking Q2 revenue of Alibaba cloud, the cloud opening mode is reshaping

Will blockchain be the antidote to the global epidemic accelerating the transformation of Internet enterprises?

StickEngine-架构11-消息队列(MessageQueue)

如何对数据库账号权限进行精细化管理?

意派Epub360丨你想要的H5模板都在这里,电子书、大转盘、红包雨、问卷调查……

How about small and medium-sized enterprises choose shared office?

实用工具类函数(持续更新)

Flink's datasource Trilogy 2: built in connector

ES6 learning notes (3): teach you to use js object-oriented thinking to realize the function of adding, deleting, modifying and checking tab column

Python basic data type -- tuple analysis
随机推荐
Zero basis to build a web search engine of its own
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
From overseas to China, rancher wants to do research on container cloud market
Look! Internet, e-commerce offline big data analysis best practice! (Internet disk link attached)
What is the meaning of sector sealing of filecoin mining machine since the main network of filecoin was put online
視覺滾動[反差美]
CloudQuery V1.2.0 版本发布
An article will take you to understand SVG gradient knowledge
Behind the first lane level navigation in the industry
PHP application docking justswap special development kit【 JustSwap.PHP ]
Kubernetes and OAM to build a unified, standardized application management platform knowledge! (Internet disk link attached)
EOS founder BM: what's the difference between UE, UBI and URI?
递归、回溯算法常用数学基础公式
C#和C/C++混合编程系列5-内存管理之GC协同
The AI method put forward by China has more and more influence. Tianda et al. Mined the development law of AI from a large number of literatures
Xmppmini project details: step by step from the principle of practical XMPP technology development 4. String decoding secrets and message package
意派Epub360丨你想要的H5模板都在这里,电子书、大转盘、红包雨、问卷调查……
FastThreadLocal 是什么鬼?吊打 ThreadLocal 的存在!!
事务的隔离级别与所带来的问题
Tron smart wallet PHP development kit [zero TRX collection]