当前位置:网站首页>Flink - use the streaming batch API to count the number of words
Flink - use the streaming batch API to count the number of words
2022-07-02 09:08:00 【Programmer Qiqi】
Execution mode ( flow / batch )
DataStream API Support different runtime execution modes , You can choose according to your use case needs and job characteristics .
DataStream API There is a kind of ” classic “ The execution of , We call it flow (STREAMING) Execution mode . This mode is suitable for continuous incremental processing , And is expected to remain online indefinitely without boundaries .
Besides , There is also a batch execution mode , We call it batch (BATCH) Execution mode . This way of executing jobs is more reminiscent of a batch processing framework , such as MapReduce. This execution mode applies to having a known fixed input , And there are boundary jobs that will not run continuously .
Apache Flink Unified method of convection and batch processing , It means that no matter what execution mode is configured , Executed on bounded input DataStream Applications will produce the same final result . It is important to pay attention to the final What do you mean here : A job executed in stream mode may produce incremental updates ( Think about inserts in databases (upsert) operation ), And batch operation only produces a final result at the end . Although the calculation method is different , As long as the presentation is appropriate , The end result will be the same .
By enabling batch execution , We allow Flink Apply additional optimizations only when we know that the input is bounded . for example , Different associations can be used (join)/ polymerization (aggregation) Strategy , Allow for more efficient task scheduling and different failure recovery behaviors shuffle. Next, we will introduce some details of execution behavior .
When can we do that? / You should use batch mode ?
Batch execution mode can only be used for Boundary The homework /Flink Program . Boundary is an attribute of data source , Tell us before execution , Are all inputs from this data source known , Or whether there will be new data , It could be infinite . For an assignment , If all its sources have boundaries , Then it is bounded , Otherwise, there is no boundary .
And the flow execution mode , It can be used for tasks with boundaries , It can also be used for borderless tasks .
Generally speaking , When your program has boundaries , You should use batch execution mode , Because it will be more efficient . When your program is borderless , You must use stream execution mode , Because only this model is general enough , Able to handle continuous data streams .
An obvious exception is when you want to use a bounded job to self expand some job states , And use the state in the subsequent borderless operation . for example , Run a bounded job through flow mode , Take a savepoint, Then restore this on a borderless job savepoint. This is a very special use case , When we allow savepoint As an additional output of a batch job , This use case may soon become obsolete .
Another situation where you may use stream mode to run bounded jobs is when you write test code that will eventually run on unbounded data sources . For testing , In these cases, it may be more natural to use bounded data sources .
Configure batch execution mode
The execution mode can be implemented through execute.runtime-mode Set up to configure . There are three optional values :
- STREAMING: classic DataStream Execution mode ( Default )
- BATCH: stay DataStream API Batch execution on
- AUTOMATIC: Let the system decide according to the boundary of the data source
This can be done by bin/flink run ... Command line parameters of , Or create / To configure StreamExecutionEnvironment Time written into program .
Here is how to configure the execution mode through the command line :
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
This example shows how to configure the execution mode in the code :
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
We do not recommend that users set the running mode in the program , Instead, use the command line to set up when submitting the application . Keeping the application code configuration free can make the program more flexible , Because the same application can be executed in any execution mode .
Count word cases
Make statistics in batch mode
technological process
Core code
ParameterTool parameterFromArgs = ParameterTool.fromArgs(args);
String input = parameterFromArgs.getRequired("input");
// Initialization environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Load data source
DataStreamSource<String> wordSource = env.readTextFile(input, "UTF-8");
// Data conversion
SingleOutputStreamOperator<Word> wordStreamOperator = wordSource.flatMap(new TokenizerFunction());
// Group by word
KeyedStream<Word, String> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<Word, String>() {
public String getKey(Word word) throws Exception {
return word.getWord();
// Sum up
SingleOutputStreamOperator<Word> sumStream = wordKeyedStream.sum("frequency");
- stay IDE Middle runtime , Need to specify -input Parameters , Enter the file address
Make statistics in stream processing
technological process
Core code
// Initialization environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Definition kafka data source
KafkaSource<String> source = KafkaSource.<String>builder()
.setValueOnlyDeserializer(new SimpleStringSchema())
// Load data source
DataStreamSource<String> kafkaWordSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Word Source");
// Data conversion
SingleOutputStreamOperator<Word> wordStreamOperator = kafkaWordSource.flatMap(new TokenizerFunction());
// Group by word
KeyedStream<Word, String> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<Word, String>() {
public String getKey(Word word) throws Exception {
return word.getWord();
// Sum up
SingleOutputStreamOperator<Word> sumStream = wordKeyedStream.sum("frequency");
Full code address
com.example.datastream.wordcount.DataStreamApiWordCountBatch -- Read data from the file for word statistics
com.example.datastream.wordcount.DataStreamApiWordCountStream -- from Kafka Consumption data for word statistics
Submitted to the flink The cluster to perform :
bin/flink run -m -c com.example.datastream.wordcount.DataStreamApiWordCountBatch -input /mnt/data/words.txt /opt/apps/study-flink-1.0.jar
-input Specify the input file path
- Dix ans d'expérience dans le développement de programmeurs vous disent quelles compétences de base vous manquez encore?
- [staff] time sign and note duration (full note | half note | quarter note | eighth note | sixteenth note | thirty second note)
- Talk about the secret of high performance of message queue -- zero copy technology
- C # save web pages as pictures (using WebBrowser)
- Pclpy projection filter -- projection of point cloud to cylinder
- Gocv split color channel
- Oracle related statistics
- Matplotlib剑客行——容纳百川的艺术家教程
- 将一串数字顺序后移
- Kubedm deploys kubernetes v1.23.5 cluster
Minecraft install resource pack
京东面试官问:LEFT JOIN关联表中用ON还是WHERE跟条件有什么区别
MYSQL安装出现问题(The service already exists)
[blackmail virus data recovery] suffix Hydra blackmail virus
C4D quick start tutorial - C4d mapping
How to realize asynchronous programming in a synchronous way?
Kubedm deploys kubernetes v1.23.5 cluster
Qt QTimer类
[staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the
CSDN Q & A_ Evaluation
Openshift build image
Kubedm deploys kubernetes v1.23.5 cluster
Illegal use of crawlers, an Internet company was terminated, the police came to the door, and 23 people were taken away
C language - Blue Bridge Cup - 7 segment code
远程连接IBM MQ报错AMQ4036解决方法
C language implementation of mine sweeping game
Multi version concurrency control mvcc of MySQL
win10使用docker拉取redis镜像报错read-only file system: unknown
C nail development: obtain all employee address books and send work notices
Analysis and solution of a classical Joseph problem
[staff] common symbols of staff (Hualian clef | treble clef | bass clef | rest | bar line)
Image transformation, transpose