当前位置:网站首页>Flink 系例 之 Reduce
Flink 系例 之 Reduce
2020-11-09 12:17:00 【不会飞的小龙人】
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.x
flink.version: 1.11.1
示例数据源 (项目码云下载)
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
*/
public class Reduce {
/**
* 遍历集合,分区打印每一次滚动聚合的结果
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1为性别字段,以相同f1值(性别)进行分区
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;
2> (张三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[不会飞的小龙人]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/437309/blog/4709286
边栏推荐
- Is SEO right or wrong?
- [design pattern] Chapter 4: Builder mode is not so difficult
- for与for...in、for Each和map和for of
- Recommended tools for Mac
- List of wechat video Number broadcasters October 2020
- 嗯,查询滑动窗口最大值的这4种方法不错....
- 使用CopyMemory API出现 尝试读取或写入受保护的内存。这通常指示其他内存已损坏。
- Complete set of linked list operations of data structure and algorithm series (3) (go)
- SQL statement to achieve the number of daffodils
- Adobe experience design / XD 2020 software installation package (with installation tutorial)
猜你喜欢
Biden wins the US election! Python developers in Silicon Valley make fun of Ku Wang in this way
Using rem, the font size changes when the screen zooms
android studio创建平板模拟器方法
Mac terminal oh my Zsh + solarized configuration
An attempt to read or write to protected memory occurred using the CopyMemory API. This usually indicates that other memory is corrupted.
Source code analysis of ThinkPHP framework execution process
未来中国电信将把云计算服务打造成为中国电信的主业
【golang】GC详解
inet_ Pton () and INET_ Detailed explanation of ntop() function
Recommended tools for Mac
随机推荐
SHOW PROFILE分析SQL语句性能开销
iPhone“连到系统上的设备没有发挥作用”原因分析及解决方法 20200105
Source code analysis of ThinkPHP framework execution process
深圳C1考证历程
Android NDK 开发实战 - 微信公众号二维码检测
How to query by page after 10 billion level data is divided into tables?
Recommended tools for Mac
块级元素和行内元素
Android权限大全
for与for...in、for Each和map和for of
天啦撸!打印日志竟然只晓得 Log4j?
接口测试如何在post请求中传递文件
嗯,查询滑动窗口最大值的这4种方法不错...
inet_pton()和inet_ntop()函数详解
Android架构之Navigation组件(二)
Understanding task and async await
El table dynamic header
你不好奇 CPU 是如何执行任务的吗?
JVM学习(五) -执行子系统
Program life: from Internet addicts to Microsoft, bat and byte offer harvesters