当前位置:网站首页>Data Lake (VI): Hudi and Flink integration
Data Lake (VI): Hudi and Flink integration
2022-06-10 14:41:00 【Lanson】
Hudi And Flink Integrate
Hudi0.8.0 Version and Flink1.12.x Compatible with previous versions , Currently tested ,Hudi0.8.0 Version starting support Flink, adopt Flink Write data to the Hudi when , Must be turned on checkpoint, There are at least 5 Time checkpoint Then you can see the corresponding hudi Data in .
But there should be some problems , The current problems are as follows :
Execute locally Flink Code to Flink When writing data , There is “java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract” error message , Expected to be hudi Version support issues . Write to Flink Data in , If you use Flink Read out , There will be corresponding errors :“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”, This error is mainly caused by the previous error Hudi There is no commit Information , When reading internally , Cannot read Commit Information leads to .
One 、maven pom.xml Import the following packages
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
</properties>
<dependencies>
<!-- Flink operation Hudi Need to pack -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- java Development Flink The required depend on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Development Scala The following dependencies need to be imported -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Read hdfs Documentation needs jar package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<!-- Flink State management RocksDB rely on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.12.1</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL Use in Blink Packages that need to be imported -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Two 、Flink Write data to Hudi Code
//1. Create objects
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build())
import org.apache.flink.streaming.api.scala._
//2. Must be turned on checkpoint The default is 5 individual checkpoint after ,hudi There will be data in the directory , Otherwise there is only one .hoodie Catalog .
env.enableCheckpointing(2000)
// env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))
//3. Set parallelism
env.setParallelism(1)
//4. Read Kakfa Data in
tableEnv.executeSql(
"""
| create table kafkaInputTable(
| id varchar,
| name varchar,
| age int,
| ts varchar,
| loc varchar
| ) with (
| 'connector' = 'kafka',
| 'topic' = 'test_tp',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='latest-offset',
| 'properties.group.id' = 'testgroup',
| 'format' = 'csv'
| )
""".stripMargin)
val table: Table = tableEnv.from("kafkaInputTable")
//5. establish Flink Corresponding hudi surface
tableEnv.executeSql(
"""
|CREATE TABLE t1(
| id VARCHAR(20) PRIMARY KEY NOT ENFORCED,-- The default primary key column is uuid, Here you can follow “PRIMARY KEY NOT ENFORCED” Specify as primary key column
| name VARCHAR(10),
| age INT,
| ts VARCHAR(20),
| loc VARCHAR(20)
|)
|PARTITIONED BY (loc)
|WITH (
| 'connector' = 'hudi',
| 'path' = '/flink_hudi_data',
| 'write.tasks' = '1', -- default is 4 ,required more resource
| 'compaction.tasks' = '1', -- default is 10 ,required more resource
| 'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
|)
""".stripMargin)
//6. Insert data into table
tableEnv.executeSql(
s"""
| insert into t1 select id,name,age,ts,loc from ${table}
""".stripMargin)
env.execute()
The above code needs attention “PRIMARY KEY NOT ENFORCED” You don't specify , If you don't specify hudi The corresponding primary key column defaults to “uuid”, After specifying, you can use the customized column name as the primary key .
边栏推荐
- golang使用反射将一个结构体的数据直接复制到另一个结构体中(通过相同字段)
- [discrete mathematics review series] II. First order logic (predicate logic)
- CRM对企业以及销售员有哪些帮助?
- Consumption mode of Message Oriented Middleware
- Golang Beep包 播放mp3 无法获取总长度 streamer.Len()为0 其他格式却可以
- 【離散數學期複習系列】二、一階邏輯(謂詞邏輯)
- How can JMeter parameterization be implemented?
- Notes on the second test of C language
- Gin blog summary 1
- 在启牛开户安全么
猜你喜欢

At the early stage of product development, do you choose to develop apps or applets?

【LogoDetection 数据集处理】(3)将训练集按照类别划分为多个文件夹

产品开发的早期阶段,是选择开发app还是小程序?

【离散数学期复习系列】四、图

SIGIR 2022 | 港大、武大提出KGCL:基于知识图谱对比学习的推荐系统

WordPress的管理员用户名是如何泄露的

CVPR 2022 Oral | SCI:实现快速、灵活与稳健的低光照图像增强

三子棋(c语言实现)

Orgin framework notes

svn外网打不开url地址怎么解决
随机推荐
Binary tree and Figure 2
LeetCode_20(括号匹配)
STM8S103f单片机的开发(1)LED灯的点亮
如何实现erp外网连接?
Flutter drawer learning summary 6
2022山东省安全员C证复训题库及在线模拟考试
Gorm set foreign key
How to implement the association between interfaces in JMeter?
KaTeX问题 —— csdn编辑时中打出等号对齐的样式
Golang Beep包 播放mp3 无法获取总长度 streamer.Len()为0 其他格式却可以
[discrete mathematics review series] v. some special charts
微信小程序 返回上一页并传参
【离散数学期复习系列】三、集合的概念及运算
什么是CAS 以及 CAS 中的 ABA 问题
【离散数学期复习系列】二、一阶逻辑(谓词逻辑)
2022 practice questions and online simulation test for the third batch of Guangdong Provincial Safety Officer a certificate (principal)
欧几里得算法求最大公因数 Go语言实现
Brief description of adaptive function
【大咖秀】博睿数据眼中的AIOps,选择正确的赛道正确的人
CVPR 2022 | 基于序列对比学习的长视频逐帧动作表示