当前位置:网站首页>Flink 系例 之 Reduce
Flink 系例 之 Reduce
2020-11-09 12:17:00 【不会飞的小龙人】
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.x
flink.version: 1.11.1
示例数据源 (项目码云下载)
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
*/
public class Reduce {
/**
* 遍历集合,分区打印每一次滚动聚合的结果
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1为性别字段,以相同f1值(性别)进行分区
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;
2> (张三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[不会飞的小龙人]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/437309/blog/4709286
边栏推荐
猜你喜欢

Understanding task and async await

Jsliang job series - 08 - handwritten promise

Reading design patterns adapter patterns

SQL statement to achieve the number of daffodils

Download Netease cloud music 10W + music library with Python

The history of C1 research in Shenzhen

【golang】GC详解

利用 Python 一键下载网易云音乐 10W+ 乐库

Recommended tools for Mac

Introduction to zero based im development (4): what is message timing consistency in IM systems?
随机推荐
Source code analysis of ThinkPHP framework execution process
A simple ability determines whether you will learn!
libssl对CentOS登录的影响
Interface tests how to pass files in post requests
SHOW PROFILE分析SQL语句性能开销
FGC online service troubleshooting, this is enough!
Open source ERP recruitment
VisualStudio(Mac)安装过程笔记
SQL Chapter 2 Chapter 3
服务应用 ClockService安卓实现闹钟
JVM learning (5) - execution subsystem
Implement crud operation
技美那么贵,不如找顾问 | AALab企业顾问业务
An attempt to read or write to protected memory occurred using the CopyMemory API. This usually indicates that other memory is corrupted.
10款必装软件,让Windows使用效率飞起!
Review of hot spots of last week (11.2-11.8)
jsliang 求职系列 - 08 - 手写 Promise
零基础IM开发入门(四):什么是IM系统的消息时序一致性?
Aren't you curious about how the CPU performs tasks?
Android NDK 开发实战 - 微信公众号二维码检测