当前位置:网站首页>Flink 系例 之 Reduce
Flink 系例 之 Reduce
2020-11-09 12:17:00 【不会飞的小龙人】
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.x
flink.version: 1.11.1
示例数据源 (项目码云下载)
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
*/
public class Reduce {
/**
* 遍历集合,分区打印每一次滚动聚合的结果
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1为性别字段,以相同f1值(性别)进行分区
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;
2> (张三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[不会飞的小龙人]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/437309/blog/4709286
边栏推荐
- Oh, my God! Printing log only knows log4j?
- 真正拖垮你的,是沉没成本
- Show profile analysis of SQL statement performance overhead
- 在企业的降本增效诉求下,Cube如何助力科盾业务容器化“一步到位”?
- Python zero basics tutorial (01)
- Implement crud operation
- 分库分表的 9种分布式主键ID 生成方案,挺全乎的
- 在嵌入式设备中实现webrtc的第三种方式③
- Understanding runloop in OC
- nodejs学习笔记(慕课网nodejs从零开发web Server博客项目)
猜你喜欢

A simple ability determines whether you will learn!

JVM learning (4) - garbage collector and memory allocation

Three practical skills of Medical Project Management

Android check box and echo

Stack & queue (go) of data structure and algorithm series

Kubernetes业务日志收集与监控

医疗项目管理的三种实用技巧

On the calculation of non interaction polarizability

AI fresh student's annual salary has increased to 400000, you can still make a career change now!

Dynamo: a typical distributed system analysis
随机推荐
EFF 认为 RIAA 正在“滥用 DMCA”来关闭 YouTube-DL
Suning's practice of large scale alarm convergence and root cause location based on Knowledge Map
共创爆款休闲游戏 “2020 Ohayoo游戏开发者沙龙”北京站报名开启
JVM learning (6) - memory model and thread
20201107第16课,使用Apache服务部署静态网站;使用Vsftpd服务传输文件
The history of C1 research in Shenzhen
SEO见风使舵,是对还是错?
AI fresh student's annual salary has increased to 400000, you can still make a career change now!
inet_ Pton () and INET_ Detailed explanation of ntop() function
Review of hot spots of last week (11.2-11.8)
JVM学习(四)-垃圾回收器和内存分配
微信圈子
050_ object-oriented
使用rem,做到屏幕缩放时,字体大小随之改变
Android NDK 开发实战 - 微信公众号二维码检测
Looking for better dynamic getter and setter solutions
Glsb involves load balancing algorithm
关于无相互作用极化率的计算
天啦撸!打印日志竟然只晓得 Log4j?
Sql分组查询后取每组的前N条记录