当前位置:网站首页>Flink入门——单词统计
Flink入门——单词统计
2022-06-26 09:32:00 【山顶看数据】
使用批处理进行wc
/*
* 批处理
* */
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.readTextFile("input/a.txt");
FlatMapOperator<String, Tuple2<String, Long>> wordsTuple = data.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING,Types.LONG));
// 这里需要注意的一点是使用Java中的lambda表达式时,需要加上.returns(Types.TUPLE(Types.STRING,Types.LONG)),因为当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
UnsortedGrouping<Tuple2<String, Long>> groupWord = wordsTuple.groupBy(0);
AggregateOperator<Tuple2<String, Long>> result = groupWord.sum(1);
result.print();
}
}
使用有界流处理进行wc
/*
* 有界流处理
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
// SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = map.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
sum.print();
environment.execute();
}
}
/*
* 有界流处理
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(0).sum(1);
result.print();
environment.execute();
}
}
边栏推荐
- MySQL单表500万条数据增、删、改、查速度测试
- mysql 数据库字段查询区分大小写设置
- VI summary of common commands
- [open5gs] open5gs installation configuration
- 2021年全国职业院校技能大赛(中职组)网络安全竞赛试题(2)详解
- 工企专利匹配数据(数十万数据量)1998-2014年
- thinkphp5手动报错
- PHP extracts TXT text to store the domain name in JSON data
- "One week's data collection" - logic gate
- Practice of production control | dilemma on assembly rack
猜你喜欢

Master data management of scientific research institutes? Suppliers or customers? I am a correspondent

The most complete and simple nanny tutorial: deep learning environment configuration anaconda+pychart+cuda+cudnn+tensorflow+pytorch

Badge series 5: use of codecov

Creation and use of XSync synchronization script (taking debian10 cluster as an example)

Param in the paper

【CVPR 2019】Semantic Image Synthesis with Spatially-Adaptive Normalization(SPADE)

【pulsar学习】pulsar架构原理

Industrial and enterprise patent matching data (hundreds of thousands of data) 1998-2014

《一周搞定模电》—负反馈

"One week to solve the model electricity" - negative feedback
随机推荐
【Open5GS】Open5GS安装配置
Introduction to QPM
Badge collection 6:api\_ Use of level
jz2440---使用uboot烧录程序
《一周搞定模电》—基本放大电路
Optimization of power assisted performance of QPM suspended window
全面解读!Golang中泛型的使用
MATLAB basic operation command
Summary of common commands of vim
kubernetes集群部署(v1.23.5)
同花顺炒股软件安全性怎样?在同花顺怎么开户
Upgrade idea to 2021.2 shortcut keys
《一周学习模电》-电容、三极管、场效应管
Practice of production control | dilemma on assembly rack
2021-11-29 轨迹规划五次多项式
"One week to solve the model electricity" - negative feedback
Is it safe to dig up money and make new debts
Origin of QPM
十万行事务锁,开了眼界了。
《一周搞定模电》—集成运算放大器