当前位置:网站首页>Flink入门——单词统计
Flink入门——单词统计
2022-06-26 09:32:00 【山顶看数据】
使用批处理进行wc
/*
* 批处理
* */
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.readTextFile("input/a.txt");
FlatMapOperator<String, Tuple2<String, Long>> wordsTuple = data.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING,Types.LONG));
// 这里需要注意的一点是使用Java中的lambda表达式时,需要加上.returns(Types.TUPLE(Types.STRING,Types.LONG)),因为当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
UnsortedGrouping<Tuple2<String, Long>> groupWord = wordsTuple.groupBy(0);
AggregateOperator<Tuple2<String, Long>> result = groupWord.sum(1);
result.print();
}
}
使用有界流处理进行wc
/*
* 有界流处理
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
// SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = map.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
sum.print();
environment.execute();
}
}
/*
* 有界流处理
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(0).sum(1);
result.print();
environment.execute();
}
}
边栏推荐
- 集合对象复制
- Regular expression learning
- Badge series 7: use of codacy
- install opencv-contrib-dev to use aruco code
- 51 single chip microcomputer ROM and ram
- "One week to solve the model electricity" - negative feedback
- Several connection query methods of SQL (internal connection, external connection, full connection and joint query)
- 2021-11-29 轨迹规划五次多项式
- CVPR:Refining Pseudo Labels with Clustering Consensus over Generations for Unsupervised Object Re-ID
- 工企专利匹配数据(数十万数据量)1998-2014年
猜你喜欢

mysql 数据库字段查询区分大小写设置

CVPR:Refining Pseudo Labels with Clustering Consensus over Generations for Unsupervised Object Re-ID

GAN Inversion: A Survey

Record a time when the server was taken to mine
![[pulsar learning] pulsar Architecture Principle](/img/ec/5ab9aabc2beafd4238dc8055ba6fb2.png)
[pulsar learning] pulsar Architecture Principle

《一周学习模电》-电容、三极管、场效应管

"One week's data collection" - logic gate

0 basic how to make a cool leadership cockpit?

Principle and application of single chip microcomputer -- Overview

Wechat official account reported error 10003
随机推荐
"One week's data collection" -- combinational logic circuit
十万行事务锁,开了眼界了。
Principe et application du micro - ordinateur à puce unique - Aperçu
工企专利匹配数据(数十万数据量)1998-2014年
《一周搞定模电》—集成运算放大器
How to correctly open the USB debugging and complete log functions of Huawei mobile phones?
Solve Django's if Version (1, 3, 3): raise improverlyconfigured ('mysqlclient 1.3.3 or new is required
Optimization of power assisted performance of QPM suspended window
"One week's work on Analog Electronics" - diodes
Thinking before QPM preparation optimization
Merrill Lynch data technology expert team | building a cloud native product system based on containers
[open source] use phenocv weedcam for more intelligent and accurate weed management
Curriculum learning (CL)
Is it safe to dig up money and make new debts
Merrill Lynch data technology expert team | application of recommendation of relevant contents in group system data retrieval
使用递归或while循环获取父/子层级结构的名称
How to solve the sample imbalance problem in machine learning?
jz2440---使用uboot烧录程序
【Open5GS】Open5GS安装配置
Real time data analysis tool