当前位置:网站首页>Construction of Flink development environment and wordcount

Construction of Flink development environment and wordcount

2022-06-11 02:53:00 Xiao Pang is so fierce!

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>bigdata16</artifactId>
        <groupId>com.shujia</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>Flink</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.11.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
package com.shujia.core

import org.apache.flink.streaming.api.scala._

object Demo01WordCount {
  def main(args: Array[String]): Unit = {
    /**
     *  establish Flink entrance 
     */
    val env: StreamExecutionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment

    // Default parallelism equals CPU The number of logical cores   It is equivalent to a parallelism of the task 
    env.setParallelism(2)

    /**
     *  adopt Socket Simulate real-time data 
     * nc -lk 8888
     *
     * DataStream:Flink The programming model in 
     */
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    // Segment each piece of data 
    val wordsDS: DataStream[String] = linesDS
      .flatMap(line => line.split(","))

    // Turn each word into  K V  Format 
    val wordsKVDS: DataStream[(String, Int)] = wordsDS.map(word => (word, 1))

    // Group according to each word 
    val keyByDS: KeyedStream[(String, Int), String] = wordsKVDS.keyBy(kv => kv._1)

    // Count the number of words 
    val wordCntDS: DataStream[(String, Int)] = keyByDS.sum(1)

    // Print the results 
    wordCntDS.print()

    // Start the task 
    env.execute("Demo01WordCount")
  }
}

原网站

版权声明
本文为[Xiao Pang is so fierce!]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206110227478228.html