当前位置:网站首页>Flink 系例 之 Reduce
Flink 系例 之 Reduce
2020-11-09 12:17:00 【不会飞的小龙人】
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.x
flink.version: 1.11.1
示例数据源 (项目码云下载)
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
*/
public class Reduce {
/**
* 遍历集合,分区打印每一次滚动聚合的结果
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1为性别字段,以相同f1值(性别)进行分区
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;
2> (张三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[不会飞的小龙人]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/437309/blog/4709286
边栏推荐
- 注意.NET Core进行请求转发问题
- In the future, China Telecom will make cloud computing service the main business of China Telecom
- Glsb involves load balancing algorithm
- Wealth and freedom? Ant financial services suspended listing, valuation or decline after regulation
- 微信视频号播主排行榜2020年10月
- Is SEO right or wrong?
- Adobe experience design / XD 2020 software installation package (with installation tutorial)
- Detailed explanation of [golang] GC
- JVM学习(六)-内存模型和线程
- SQL语句实现水仙花数求取
猜你喜欢

FGC online service troubleshooting, this is enough!

How to ensure that messages are not consumed repeatedly? (how to ensure the idempotent of message consumption)

共创爆款休闲游戏 “2020 Ohayoo游戏开发者沙龙”北京站报名开启

实现商品CRUD操作

android studio创建平板模拟器方法

服务应用 ClockService安卓实现闹钟

Source code analysis of ThinkPHP framework execution process

Biden wins the US election! Python developers in Silicon Valley make fun of Ku Wang in this way

Kubernetes业务日志收集与监控

医疗项目管理的三种实用技巧
随机推荐
JVM learning (5) - execution subsystem
Android rights
In the future, China Telecom will make cloud computing service the main business of China Telecom
注意.NET Core进行请求转发问题
New features of Fedora 33 workstation
Interview summary on November 7, 2020 (interview 12K)
vscode 插件配置指北
Android架构之Navigation组件(二)
050_ object-oriented
jsliang 求职系列 - 08 - 手写 Promise
Interface tests how to pass files in post requests
利用 Python 一键下载网易云音乐 10W+ 乐库
Handwriting Koa.js Source code
android studio创建平板模拟器方法
Nine kinds of distributed primary key ID generation schemes of sub database and sub table are quite comprehensive
A simple ability determines whether you will learn!
用一种简单的方式实现终端文字粘贴板
Oh, my God! Printing log only knows log4j?
Android 复选框 以及回显
Front end code style practice prettier + eslint + git hook + lint staged