当前位置:网站首页>Flink 系例 之 Reduce
Flink 系例 之 Reduce
2020-11-09 12:17:00 【不会飞的小龙人】
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.x
flink.version: 1.11.1
示例数据源 (项目码云下载)
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
*/
public class Reduce {
/**
* 遍历集合,分区打印每一次滚动聚合的结果
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1为性别字段,以相同f1值(性别)进行分区
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;
2> (张三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[不会飞的小龙人]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/437309/blog/4709286
边栏推荐
- Shoes? Forecasting stock market trends? Taobao second kill? Python means what you want
- Android 复选框 以及回显
- Mac terminal oh my Zsh + solarized configuration
- Safety (miscellany)
- 医疗项目管理的三种实用技巧
- 配置交换机Trunk接口流量本地优先转发(集群/堆叠)
- Visual Studio (MAC) installation process notes
- 接口测试如何在post请求中传递文件
- A simple way to realize terminal text paste board
- 技美那么贵,不如找顾问 | AALab企业顾问业务
猜你喜欢
AI fresh student's annual salary has increased to 400000, you can still make a career change now!
Android架构之Navigation组件(二)
Python zero basics tutorial (01)
Complete set of linked list operations of data structure and algorithm series (3) (go)
Analysis of the source code of ThinkPHP facade
为wget命令设置代理
Introduction to zero based im development (4): what is message timing consistency in IM systems?
Program life: from Internet addicts to Microsoft, bat and byte offer harvesters
Android NDK 开发实战 - 微信公众号二维码检测
Mac terminal oh my Zsh + solarized configuration
随机推荐
接口测试如何在post请求中传递文件
Chrome browser engine blink & V8
FGC online service troubleshooting, this is enough!
Detailed explanation of [golang] GC
手写Koa.js源码
Jsliang job series - 08 - handwritten promise
大型项目Objective-C - NSURLSession接入短信验证码应用实例分享
Principle analysis and performance tuning of elasticsearch
安全(杂记)
For and for... In, for each and map and for of
AI fresh student's annual salary has increased to 400000, you can still make a career change now!
SQL第二章第三章
为wget命令设置代理
AI应届生年薪涨到40万了,你现在转行还来得及!
开源ERP招聘了
A simple ability determines whether you will learn!
From coding, network transmission, architecture design, Tencent cloud high quality, high availability real-time audio and video technology practice
A simple way to realize terminal text paste board
用一种简单的方式实现终端文字粘贴板
嗯,查询滑动窗口最大值的这4种方法不错....