当前位置:网站首页>Data Lake (VI): Hudi and Flink integration

Data Lake (VI): Hudi and Flink integration

2022-06-10 14:41:00 Lanson

​Hudi And Flink Integrate

Hudi0.8.0 Version and Flink1.12.x Compatible with previous versions , Currently tested ,Hudi0.8.0 Version starting support Flink, adopt Flink Write data to the Hudi when , Must be turned on checkpoint, There are at least 5 Time checkpoint Then you can see the corresponding hudi Data in .

But there should be some problems , The current problems are as follows :

  • Execute locally Flink Code to Flink When writing data , There is “java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract” error message , Expected to be hudi Version support issues .
  • Write to Flink Data in , If you use Flink Read out , There will be corresponding errors :“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”, This error is mainly caused by the previous error Hudi There is no commit Information , When reading internally , Cannot read Commit Information leads to .

One 、maven pom.xml Import the following packages

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.1</flink.version>
</properties>

<dependencies>
    <!-- Flink operation Hudi Need to pack -->
    <dependency>
        <groupId>org.apache.hudi</g
roupId>
        <artifactId>hudi-flink-bundle_2.11</artifactId>
        <version>0.8.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- java  Development Flink The required depend on  -->
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink  Development Scala The following dependencies need to be imported  -->
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!--  Read hdfs Documentation needs jar package -->
    <dependency>
    <groupId>org.apache.hadoop</g
roupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.9.2</version>
    </dependency>
    <!-- Flink  State management  RocksDB  rely on  -->
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink Kafka Connector dependency  -->
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-csv</artifactId>
        <version>1.12.1</version>
    </dependency>

    <!-- Flink SQL & Table-->
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink SQL Use in Blink  Packages that need to be imported -->
    <dependency>
        <groupId>org.apache.flink</g
roupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</
dependencies>

Two 、Flink Write data to Hudi Code

//1. Create objects 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
    .useBlinkPlanner().inStreamingMode().build())

    import org.apache.flink.streaming.api.scala._

    //2. Must be turned on checkpoint  The default is 5 individual checkpoint after ,hudi There will be data in the directory , Otherwise there is only one .hoodie Catalog .
    env.enableCheckpointing(2000)
//    env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))

    //3. Set parallelism
    env.setParallelism(1)

    //4. Read Kakfa  Data in
    tableEnv.executeSql(
      """
        | create table kafkaInputTable(
        |  id varchar,
        |  name varchar,
        |  age int,
        |  ts varchar,
        |  loc varchar
        | ) with (
        |  'connector' = 'kafka',
        |  'topic' = 'test_tp',
        |  'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
        |  'scan.startup.mode'='latest-offset',
        |  'properties.group.id' = 'testgroup',
        |  'format' = 'csv'
        | )
      "
"".stripMargin)

    val table: Table = tableEnv.from("kafkaInputTable")

    //5. establish Flink  Corresponding hudi surface
    tableEnv.executeSql(
      """
        |CREATE TABLE t1(
        |  id VARCHAR(20) PRIMARY KEY NOT ENFORCED,-- The default primary key column is uuid, Here you can follow “PRIMARY KEY NOT ENFORCED” Specify as primary key column
        |  name VARCHAR(10),
        |  age INT,
        |  ts VARCHAR(20),
        |  loc VARCHAR(20)
        |)
        |PARTITIONED BY (loc)
        |WITH (
        |  'connector' = 'hudi',
        |  'path' = '/flink_hudi_data',
        |  'write.tasks' = '1', -- default is 4 ,required more resource
        |  'compaction.tasks' = '1', -- default is 10 ,required more resource
        |  'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
        |)
      "
"".stripMargin)

    //6. Insert data into table
    tableEnv.executeSql(
      s"""
         | insert into t1 select id,name,age,ts,loc from ${table}
      "
"".stripMargin)

    env.execute()

The above code needs attention “PRIMARY KEY NOT ENFORCED” You don't specify , If you don't specify hudi The corresponding primary key column defaults to “uuid”, After specifying, you can use the customized column name as the primary key .

原网站

版权声明
本文为[Lanson]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101431526277.html