当前位置:网站首页>Reduce of Flink
Reduce of Flink
2020-11-09 12:17:00 【Little dragon man who can't fly】
Reduce operator : Rolling aggregation of data streams , And return the merged result of each rolling aggregation calculation
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Reduce.java
import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
/**
* @Description Reduce operator : Rolling aggregation of data streams , And return the merged result of each rolling aggregation calculation
*/
public class Reduce {
/**
* Ergodic set , Partition printing results of each rolling aggregation
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
// Be careful : Use Integer When partitioning , It will result in incorrect partition results , convert to String Type output key The output is correct
KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
//f1 For the gender field , With the same f1 value ( Gender ) partition
return String.valueOf(tuple3.f1);
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
int totalAge = t0.f2 + t1.f2;
return new Tuple3<>("", t0.f1, totalAge);
}
});
result.print();
env.execute("flink Reduce job");
}
}
Print the results
## explain : Why each parameter of the first data object in each partition has a value , Because the rolling aggregation returns the first data object overlaid forward from the second data object , Start calculating , So the first data object doesn't enter at all reduce Method ;
2> ( Zhang San ,man,20)
2> (,man,49)
2> (,man,79)
4> ( Li Si ,girl,24)
4> (,girl,56)
4> (,girl,74)
版权声明
本文为[Little dragon man who can't fly]所创,转载请带上原文链接,感谢
边栏推荐
- Aren't you curious about how the CPU performs tasks?
- In the future, China Telecom will make cloud computing service the main business of China Telecom
- Depth analysis based on synchronized lock
- 050_ object-oriented
- On the calculation of non interaction polarizability
- From coding, network transmission, architecture design, Tencent cloud high quality, high availability real-time audio and video technology practice
- inet_pton()和inet_ntop()函数详解
- 接口测试如何在post请求中传递文件
- Adobe Experience Design /Xd 2020软件安装包(附安装教程)
- Biden wins the US election! Python developers in Silicon Valley make fun of Ku Wang in this way
猜你喜欢
在嵌入式设备中实现webrtc的第三种方式③
嗯,查询滑动窗口最大值的这4种方法不错....
Stack & queue (go) of data structure and algorithm series
外贸自建网站域名的选择— Namesilo 域名购买
Download Netease cloud music 10W + music library with Python
Interview summary on November 7, 2020 (interview 12K)
【分布式】分布式锁都有哪些实现方案?
服务应用 ClockService安卓实现闹钟
零基础IM开发入门(四):什么是IM系统的消息时序一致性?
Oh, my God! Printing log only knows log4j?
随机推荐
JVM学习(四)-垃圾回收器和内存分配
SEO见风使舵,是对还是错?
Git delete IML file
What really drags you down is sunk costs
Shoes? Forecasting stock market trends? Taobao second kill? Python means what you want
android studio AIDL的使用
Android架构之Navigation组件(二)
Android 复选框 以及回显
The third way to realize webrtc in embedded devices
After SQL group query, get the first n records of each group
Where should wild card SSL certificate register and apply
块级元素和行内元素
AI fresh student's annual salary has increased to 400000, you can still make a career change now!
Review of hot spots of last week (11.2-11.8)
For and for... In, for each and map and for of
如何用函数框架快速开发大型 Web 应用 | 实战
Explain Python input() function: get user input string
AndroidStudio导入定制化的framework classess.jar AS 4.0.1版本亲测有效
接口测试如何在post请求中传递文件
List of wechat video Number broadcasters October 2020