当前位置:网站首页>Flink入门——单词统计

Flink入门——单词统计

2022-06-26 09:32:00 山顶看数据

使用批处理进行wc

/*
* 批处理
* */
public class WordCount {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.readTextFile("input/a.txt");

        FlatMapOperator<String, Tuple2<String, Long>> wordsTuple = data.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG));

        // 这里需要注意的一点是使用Java中的lambda表达式时,需要加上.returns(Types.TUPLE(Types.STRING,Types.LONG)),因为当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息

        UnsortedGrouping<Tuple2<String, Long>>  groupWord = wordsTuple.groupBy(0);

        AggregateOperator<Tuple2<String, Long>> result = groupWord.sum(1);

        result.print();
    }
}

使用有界流处理进行wc

/*
* 有界流处理
* */
public class Word {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DataStream<String> source = environment.readTextFile("input/a.txt");

        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        
        // SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        });


        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = map.keyBy(0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);

        sum.print();

        environment.execute();
    }

}
/*
* 有界流处理
* */
public class Word {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DataStream<String> source = environment.readTextFile("input/a.txt");

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }).keyBy(0).sum(1);

        result.print();


        environment.execute();
    }

}

原网站

版权声明
本文为[山顶看数据]所创,转载请带上原文链接,感谢
https://blog.csdn.net/li1579026891/article/details/125159853