当前位置:网站首页>Reduce of Flink

Reduce of Flink

2020-11-09 12:17:00 Little dragon man who can't fly

Reduce operator : Rolling aggregation of data streams , And return the merged result of each rolling aggregation calculation

Sample environment

java.version: 1.8.x
flink.version: 1.11.1

Sample data source  ( Project code cloud download )

Flink System examples And Build development environment and data

Reduce.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;

/**
 * @Description Reduce operator : Rolling aggregation of data streams , And return the merged result of each rolling aggregation calculation 
 */
public class Reduce {

    /**
     *  Ergodic set , Partition printing results of each rolling aggregation 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
        // Be careful : Use Integer When partitioning , It will result in incorrect partition results , convert to String Type output key The output is correct 
        KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
                //f1 For the gender field , With the same f1 value ( Gender ) partition 
                return String.valueOf(tuple3.f1);
            }
        });
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> result =  keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
                int totalAge = t0.f2 + t1.f2;
                return new Tuple3<>("", t0.f1, totalAge);
            }
        });
        result.print();
        env.execute("flink Reduce job");
    }
}

Print the results

##  explain : Why each parameter of the first data object in each partition has a value , Because the rolling aggregation returns the first data object overlaid forward from the second data object , Start calculating , So the first data object doesn't enter at all reduce Method ;
2> ( Zhang San ,man,20)
2> (,man,49)
2> (,man,79)
4> ( Li Si ,girl,24)
4> (,girl,56)
4> (,girl,74)

 

版权声明
本文为[Little dragon man who can't fly]所创,转载请带上原文链接,感谢