当前位置:网站首页>Flink 系例 之 Reduce
Flink 系例 之 Reduce
2020-11-09 12:17:00 【不会飞的小龙人】
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.x
flink.version: 1.11.1
示例数据源 (项目码云下载)
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
*/
public class Reduce {
/**
* 遍历集合,分区打印每一次滚动聚合的结果
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1为性别字段,以相同f1值(性别)进行分区
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;
2> (张三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[不会飞的小龙人]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/437309/blog/4709286
边栏推荐
- SQL statement to achieve the number of daffodils
- Glsb involves load balancing algorithm
- Large scale project Objective-C - nsurlsession access SMS verification code application example sharing
- JVM learning (4) - garbage collector and memory allocation
- [design pattern] Chapter 4: Builder mode is not so difficult
- 如何保证消息不被重复消费?(如何保证消息消费的幂等性)
- Visit Jingdong | members of Youth Innovation Alliance of China Academy of space technology visit Jingdong headquarters
- Implement crud operation
- Mac terminal oh my Zsh + solarized configuration
- Aren't you curious about how the CPU performs tasks?
猜你喜欢

Impact of libssl on CentOS login

Is SEO right or wrong?

通配符SSL证书应该去哪里注册申请

Handwriting Koa.js Source code

如何保证消息不被重复消费?(如何保证消息消费的幂等性)

接口测试如何在post请求中传递文件

Nine kinds of distributed primary key ID generation schemes of sub database and sub table are quite comprehensive

技美那么贵,不如找顾问 | AALab企业顾问业务

Adobe experience design / XD 2020 software installation package (with installation tutorial)

JVM学习(五) -执行子系统
随机推荐
安卓开发——服务应用,计时器的实现(线程+服务)
The third way to realize webrtc in embedded devices
外贸自建网站域名的选择— Namesilo 域名购买
Handwriting Koa.js Source code
未来中国电信将把云计算服务打造成为中国电信的主业
JVM学习(四)-垃圾回收器和内存分配
jsliang 求职系列 - 08 - 手写 Promise
在企业的降本增效诉求下,Cube如何助力科盾业务容器化“一步到位”?
android studio创建平板模拟器方法
Well, these four ways to query the maximum value of sliding window are good
Android check box and echo
Android NDK 开发实战 - 微信公众号二维码检测
使用流读文件写文件处理大文件
Chrome browser engine blink & V8
手写Koa.js源码
Biden wins the US election! Python developers in Silicon Valley make fun of Ku Wang in this way
Large scale project Objective-C - nsurlsession access SMS verification code application example sharing
Shoes? Forecasting stock market trends? Taobao second kill? Python means what you want
Learning notes of nodejs
What really drags you down is sunk costs