当前位置:网站首页>Getting started with Flink - word statistics
Getting started with Flink - word statistics
2022-06-26 09:44:00 【Look at the data at the top of the mountain】
Using batch processing wc
/*
* The batch
* */
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.readTextFile("input/a.txt");
FlatMapOperator<String, Tuple2<String, Long>> wordsTuple = data.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING,Types.LONG));
// One thing to note here is the use of Java Medium lambda When the expression , Need to add .returns(Types.TUPLE(Types.STRING,Types.LONG)), Because when Lambda Expressions use java When generics , Due to the existence of generic erasure , Declaration type information to be displayed
UnsortedGrouping<Tuple2<String, Long>> groupWord = wordsTuple.groupBy(0);
AggregateOperator<Tuple2<String, Long>> result = groupWord.sum(1);
result.print();
}
}
Using bounded flow processing wc
/*
* Bounded flow processing
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
// SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = map.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
sum.print();
environment.execute();
}
}
/*
* Bounded flow processing
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(0).sum(1);
result.print();
environment.execute();
}
}
边栏推荐
- How to solve the sample imbalance problem in machine learning?
- online trajectory generation
- The most complete and simple nanny tutorial: deep learning environment configuration anaconda+pychart+cuda+cudnn+tensorflow+pytorch
- c语言语法基础之——局部变量及存储类别、全局变量及存储类别、宏定义 学习
- "One week's work on Analog Electronics" - optocoupler and other components
- GAN Inversion: A Survey
- Comparison of similar PMS in QPM
- 3 big questions! Redis cache exceptions and handling scheme summary
- 力扣------从数组中移除最大值和最小值
- pcl install
猜你喜欢

Board end power hardware debugging bug

"One week's work on Analog Electronics" - diodes

Notes on sports planning on November 22, 2021

VI summary of common commands

测试须知——常见接口协议解析

"One week's data collection" - logic gate

【CVPR 2021】Intra-Inter Camera Similarity for Unsupervised Person Re-Identification (IICS++)

Thinkphp5 manual error reporting

【CVPR 2021】Unsupervised Pre-training for Person Re-identification(UPT)

php提取txt文本存储json数据中的域名
随机推荐
Opencv depthframe - > pointcloud causes segmentation fault!
Upgrade idea to 2021.2 shortcut keys
"One week's data collection" -- combinational logic circuit
js---获取对象数组中key值相同的数据,得到一个新的数组
Differences between VI and vim and common commands
PHP does not allow images to be uploaded together with data (no longer uploading images before uploading data)
正则表达的学习
进入页面输入框自动获取焦点
Use recursion or a while loop to get the name of the parent / child hierarchy
"One week's work on Analog Electronics" - Basic amplification circuit
Origin of QPM
Common circuit design
Redis 新手入门
thinkphp6.0的第三方扩展包,支持上传阿里云,七牛云
c语言语法基础之——局部变量及存储类别、全局变量及存储类别、宏定义 学习
Classified catalogue of high quality sci-tech periodicals in the field of computing
点击遮罩层关闭弹窗
Click the mask layer to close the pop-up window
GAN Inversion: A Survey
【AAAI 2021】Few-Shot One-Class Classification via Meta-Learning 【FSOCC via Meta-learning】