当前位置:网站首页>Flink - use the streaming batch API to count the number of words
Flink - use the streaming batch API to count the number of words
2022-07-02 09:08:00 【Programmer Qiqi】
Execution mode ( flow / batch )
DataStream API Support different runtime execution modes , You can choose according to your use case needs and job characteristics .
DataStream API There is a kind of ” classic “ The execution of , We call it flow (STREAMING) Execution mode . This mode is suitable for continuous incremental processing , And is expected to remain online indefinitely without boundaries .
Besides , There is also a batch execution mode , We call it batch (BATCH) Execution mode . This way of executing jobs is more reminiscent of a batch processing framework , such as MapReduce. This execution mode applies to having a known fixed input , And there are boundary jobs that will not run continuously .
Apache Flink Unified method of convection and batch processing , It means that no matter what execution mode is configured , Executed on bounded input DataStream Applications will produce the same final result . It is important to pay attention to the final What do you mean here : A job executed in stream mode may produce incremental updates ( Think about inserts in databases (upsert) operation ), And batch operation only produces a final result at the end . Although the calculation method is different , As long as the presentation is appropriate , The end result will be the same .
By enabling batch execution , We allow Flink Apply additional optimizations only when we know that the input is bounded . for example , Different associations can be used (join)/ polymerization (aggregation) Strategy , Allow for more efficient task scheduling and different failure recovery behaviors shuffle. Next, we will introduce some details of execution behavior .
When can we do that? / You should use batch mode ?
Batch execution mode can only be used for Boundary The homework /Flink Program . Boundary is an attribute of data source , Tell us before execution , Are all inputs from this data source known , Or whether there will be new data , It could be infinite . For an assignment , If all its sources have boundaries , Then it is bounded , Otherwise, there is no boundary .
And the flow execution mode , It can be used for tasks with boundaries , It can also be used for borderless tasks .
Generally speaking , When your program has boundaries , You should use batch execution mode , Because it will be more efficient . When your program is borderless , You must use stream execution mode , Because only this model is general enough , Able to handle continuous data streams .
An obvious exception is when you want to use a bounded job to self expand some job states , And use the state in the subsequent borderless operation . for example , Run a bounded job through flow mode , Take a savepoint, Then restore this on a borderless job savepoint. This is a very special use case , When we allow savepoint As an additional output of a batch job , This use case may soon become obsolete .
Another situation where you may use stream mode to run bounded jobs is when you write test code that will eventually run on unbounded data sources . For testing , In these cases, it may be more natural to use bounded data sources .
Configure batch execution mode
The execution mode can be implemented through execute.runtime-mode Set up to configure . There are three optional values :
- STREAMING: classic DataStream Execution mode ( Default )
- BATCH: stay DataStream API Batch execution on
- AUTOMATIC: Let the system decide according to the boundary of the data source
This can be done by bin/flink run ... Command line parameters of , Or create / To configure StreamExecutionEnvironment Time written into program .
Here is how to configure the execution mode through the command line :
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
This example shows how to configure the execution mode in the code :
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
We do not recommend that users set the running mode in the program , Instead, use the command line to set up when submitting the application . Keeping the application code configuration free can make the program more flexible , Because the same application can be executed in any execution mode .
Count word cases
Make statistics in batch mode
technological process

Core code
ParameterTool parameterFromArgs = ParameterTool.fromArgs(args);
String input = parameterFromArgs.getRequired("input");
// Initialization environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// Load data source
DataStreamSource<String> wordSource = env.readTextFile(input, "UTF-8");
// Data conversion
SingleOutputStreamOperator<Word> wordStreamOperator = wordSource.flatMap(new TokenizerFunction());
// Group by word
KeyedStream<Word, String> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<Word, String>() {
@Override
public String getKey(Word word) throws Exception {
return word.getWord();
}
});
// Sum up
SingleOutputStreamOperator<Word> sumStream = wordKeyedStream.sum("frequency");
sumStream.print();
env.execute("WordCountBatch");
- stay IDE Middle runtime , Need to specify -input Parameters , Enter the file address

Make statistics in stream processing
technological process

Core code
// Initialization environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Definition kafka data source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_WORD")
.setGroupId("TEST_GROUP")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// Load data source
DataStreamSource<String> kafkaWordSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Word Source");
// Data conversion
SingleOutputStreamOperator<Word> wordStreamOperator = kafkaWordSource.flatMap(new TokenizerFunction());
// Group by word
KeyedStream<Word, String> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<Word, String>() {
@Override
public String getKey(Word word) throws Exception {
return word.getWord();
}
});
// Sum up
SingleOutputStreamOperator<Word> sumStream = wordKeyedStream.sum("frequency");
sumStream.print();
env.execute("WordCountStream");
Full code address
https://github.com/Mr-LuXiaoHua/study-flink
com.example.datastream.wordcount.DataStreamApiWordCountBatch -- Read data from the file for word statistics
com.example.datastream.wordcount.DataStreamApiWordCountStream -- from Kafka Consumption data for word statistics
Submitted to the flink The cluster to perform :
bin/flink run -m 127.0.0.1:8081 -c com.example.datastream.wordcount.DataStreamApiWordCountBatch -input /mnt/data/words.txt /opt/apps/study-flink-1.0.jar
-input Specify the input file path 边栏推荐
- 随笔:RGB图像颜色分离(附代码)
- C nail development: obtain all employee address books and send work notices
- 盘点典型错误之TypeError: X() got multiple values for argument ‘Y‘
- QT drag event
- 汉诺塔问题的求解与分析
- MYSQL安装出现问题(The service already exists)
- 「面试高频题」难度大 1.5/5,经典「前缀和 + 二分」运用题
- [blackmail virus data recovery] suffix Rook3 blackmail virus
- [staff] time mark and note duration (staff time mark | full note rest | half note rest | quarter note rest | eighth note rest | sixteenth note rest | thirty second note rest)
- Qt QTimer类
猜你喜欢

Minecraft module service opening

win10使用docker拉取redis镜像报错read-only file system: unknown

聊聊消息队列高性能的秘密——零拷贝技术

C nail development: obtain all employee address books and send work notices

Minecraft安装资源包

oracle修改数据库字符集

Aneng logistics' share price hit a new low: the market value evaporated by nearly 10 billion yuan, and it's useless for chairman Wang Yongjun to increase his holdings

C4D quick start tutorial - Chamfer

What is the future value of fluorite mine of karaqin Xinbao Mining Co., Ltd. under zhongang mining?

Minecraft群組服開服
随机推荐
Mysql安装时mysqld.exe报`应用程序无法正常启动(0xc000007b)`
京东面试官问:LEFT JOIN关联表中用ON还是WHERE跟条件有什么区别
Nacos download, start and configure MySQL database
使用IBM MQ远程连接时报错AMQ 4043解决思路
聊聊消息队列高性能的秘密——零拷贝技术
Select sort and insert sort
Minecraft空岛服开服
C# 将网页保存为图片(利用WebBrowser)
C language - Blue Bridge Cup - 7 segment code
oracle删除表空间及用户
Matplotlib剑客行——初相识Matplotlib
[blackmail virus data recovery] suffix Hydra blackmail virus
Solution and analysis of Hanoi Tower problem
First week of JS study
判断是否是数独
[blackmail virus data recovery] suffix Rook3 blackmail virus
Shengshihaotong and Guoao (Shenzhen) new energy Co., Ltd. build the charging pile industry chain
History of Web Technology
WSL installation, beautification, network agent and remote development
ORA-12514问题解决方法


