当前位置:网站首页>数据湖(六):Hudi与Flink整合
数据湖(六):Hudi与Flink整合
2022-06-10 14:32:00 【Lanson】
Hudi与Flink整合
Hudi0.8.0版本与Flink1.12.x之上版本兼容,目前经过测试,Hudi0.8.0版本开始支持Flink,通过Flink写数据到Hudi时,必须开启checkpoint,至少有5次checkpoint后才能看到对应hudi中的数据。
但是应该是有一些问题,目前问题如下:
在本地执行Flink代码向Flink写数据时,存在“java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract”错误信息,预计是hudi版本支持问题。 写入到Flink中的数据,如果使用Flink读取出来,会有对应的错误:“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”,这个错误主要是由于上一个错误导致Hudi中没有commit信息,在内部读取时,读取不到Commit信息导致。
一、maven pom.xml导入如下包
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
</properties>
<dependencies>
<!-- Flink操作Hudi需要的包-->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- java 开发Flink所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink 开发Scala需要导入以下依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 读取hdfs文件需要jar包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<!-- Flink 状态管理 RocksDB 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.12.1</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL中使用Blink 需要导入的包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
二、Flink 写入数据到Hudi代码
//1.创建对象
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build())
import org.apache.flink.streaming.api.scala._
//2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据,不然只有一个.hoodie目录。
env.enableCheckpointing(2000)
// env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))
//3.设置并行度
env.setParallelism(1)
//4.读取Kakfa 中的数据
tableEnv.executeSql(
"""
| create table kafkaInputTable(
| id varchar,
| name varchar,
| age int,
| ts varchar,
| loc varchar
| ) with (
| 'connector' = 'kafka',
| 'topic' = 'test_tp',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='latest-offset',
| 'properties.group.id' = 'testgroup',
| 'format' = 'csv'
| )
""".stripMargin)
val table: Table = tableEnv.from("kafkaInputTable")
//5.创建Flink 对应的hudi表
tableEnv.executeSql(
"""
|CREATE TABLE t1(
| id VARCHAR(20) PRIMARY KEY NOT ENFORCED,--默认主键列为uuid,这里可以后面跟上“PRIMARY KEY NOT ENFORCED”指定为主键列
| name VARCHAR(10),
| age INT,
| ts VARCHAR(20),
| loc VARCHAR(20)
|)
|PARTITIONED BY (loc)
|WITH (
| 'connector' = 'hudi',
| 'path' = '/flink_hudi_data',
| 'write.tasks' = '1', -- default is 4 ,required more resource
| 'compaction.tasks' = '1', -- default is 10 ,required more resource
| 'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
|)
""".stripMargin)
//6.向表中插入数据
tableEnv.executeSql(
s"""
| insert into t1 select id,name,age,ts,loc from ${table}
""".stripMargin)
env.execute()
以上代码需要注意“PRIMARY KEY NOT ENFORCED”可以不指定,如果不指定hudi对应的主键列默认是“uuid”,指定后可以使用自定义的列名当做主键。
边栏推荐
- 数仓的基本概念
- QT 基于QScrollArea的界面嵌套移动
- 【離散數學期複習系列】二、一階邏輯(謂詞邏輯)
- Operation of simulated examination platform for theoretical question bank of refrigeration and air conditioning equipment operation in 2022
- Basic concept of data warehouse
- 自适应功能简略
- 焱融看|混合云环境下,如何实现数据湖最优存储解决方案
- Primary master-slave table integration process development process
- Do you understand all these difficult memory problems?
- what‘t the meaning of “de facto“
猜你喜欢

Leetcode 2293. 极大极小游戏(可以.一次过)

KAtex problem - the style of equal sign alignment in CSDN editing

初试c语言之第二次笔记

2022 Shandong Province safety officer C certificate retraining question bank and online simulation examination

How can JMeter parameterization be implemented?

【Vue/js】通过localStorage浏览器实现变量和对象的本地缓存(图文+完整源代码)

Notes on the second test of C language
![[discrete mathematics review series] i. propositional logic](/img/ae/7f062cfa416a26be3d32dfb1353c80.png)
[discrete mathematics review series] i. propositional logic

Design tools and skills for beginners to build their own blog
![[vue/js] realize local caching of variables and objects through localstorage browser (text + full source code)](/img/4d/d6276955277942f96f11df3e4ecd4c.png)
[vue/js] realize local caching of variables and objects through localstorage browser (text + full source code)
随机推荐
ScrollView 初始化的时候不在最顶部?
2022年危险化学品生产单位安全生产管理人员考试模拟100题及在线模拟考试
碰撞检测 Unity实验代码
【重庆大学】初试复试资料分享(附考研群)
【离散数学期复习系列】三、集合的概念及运算
Gin blog summary 1
22.6.7 successfully use doc2vec model to generate embedded vectors
Allan方差与随机误差辨识
[special introduction] round table forum -- the integration of AI and audio and video technology
Beijing / Shanghai internal promotion | recruitment of full-time interns in the system and network group of Microsoft Research Asia
Docker deploys a redis cluster
欧几里得算法求最大公因数 Go语言实现
LeetCode_ 21 (merge two ordered linked lists)
Brief description of adaptive function
Binary tree and Figure 2
How to build Haojing technology when the computing power network is brought into reality?
What is CAS and ABA in CAS
2022 Shandong Province safety officer C certificate retraining question bank and online simulation examination
Shutter wrap button bottomnavigationbar learning summary 4
STM8S103f单片机的开发(1)LED灯的点亮