当前位置:网站首页>Data Lake (VI): Hudi and Flink integration
Data Lake (VI): Hudi and Flink integration
2022-06-10 14:41:00 【Lanson】
Hudi And Flink Integrate
Hudi0.8.0 Version and Flink1.12.x Compatible with previous versions , Currently tested ,Hudi0.8.0 Version starting support Flink, adopt Flink Write data to the Hudi when , Must be turned on checkpoint, There are at least 5 Time checkpoint Then you can see the corresponding hudi Data in .
But there should be some problems , The current problems are as follows :
Execute locally Flink Code to Flink When writing data , There is “java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract” error message , Expected to be hudi Version support issues . Write to Flink Data in , If you use Flink Read out , There will be corresponding errors :“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”, This error is mainly caused by the previous error Hudi There is no commit Information , When reading internally , Cannot read Commit Information leads to .
One 、maven pom.xml Import the following packages
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
</properties>
<dependencies>
<!-- Flink operation Hudi Need to pack -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- java Development Flink The required depend on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Development Scala The following dependencies need to be imported -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Read hdfs Documentation needs jar package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<!-- Flink State management RocksDB rely on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.12.1</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL Use in Blink Packages that need to be imported -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Two 、Flink Write data to Hudi Code
//1. Create objects
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build())
import org.apache.flink.streaming.api.scala._
//2. Must be turned on checkpoint The default is 5 individual checkpoint after ,hudi There will be data in the directory , Otherwise there is only one .hoodie Catalog .
env.enableCheckpointing(2000)
// env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))
//3. Set parallelism
env.setParallelism(1)
//4. Read Kakfa Data in
tableEnv.executeSql(
"""
| create table kafkaInputTable(
| id varchar,
| name varchar,
| age int,
| ts varchar,
| loc varchar
| ) with (
| 'connector' = 'kafka',
| 'topic' = 'test_tp',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='latest-offset',
| 'properties.group.id' = 'testgroup',
| 'format' = 'csv'
| )
""".stripMargin)
val table: Table = tableEnv.from("kafkaInputTable")
//5. establish Flink Corresponding hudi surface
tableEnv.executeSql(
"""
|CREATE TABLE t1(
| id VARCHAR(20) PRIMARY KEY NOT ENFORCED,-- The default primary key column is uuid, Here you can follow “PRIMARY KEY NOT ENFORCED” Specify as primary key column
| name VARCHAR(10),
| age INT,
| ts VARCHAR(20),
| loc VARCHAR(20)
|)
|PARTITIONED BY (loc)
|WITH (
| 'connector' = 'hudi',
| 'path' = '/flink_hudi_data',
| 'write.tasks' = '1', -- default is 4 ,required more resource
| 'compaction.tasks' = '1', -- default is 10 ,required more resource
| 'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
|)
""".stripMargin)
//6. Insert data into table
tableEnv.executeSql(
s"""
| insert into t1 select id,name,age,ts,loc from ${table}
""".stripMargin)
env.execute()
The above code needs attention “PRIMARY KEY NOT ENFORCED” You don't specify , If you don't specify hudi The corresponding primary key column defaults to “uuid”, After specifying, you can use the customized column name as the primary key .
边栏推荐
- 【離散數學期複習系列】二、一階邏輯(謂詞邏輯)
- Wechat applet returns to the previous page and transfers parameters
- C multithreading learning note 1
- KaTeX问题 —— csdn编辑时中打出等号对齐的样式
- CVPR 2022 | 基于序列对比学习的长视频逐帧动作表示
- 微信小程序 关闭当前页面
- [discrete mathematics review series] VI. tree
- 超强实操!手把手教学Kinect深度图与RGB摄像头的标定与配准
- Does Fortran have a standard library
- CVPR 2022 | frame by frame motion representation of long video based on sequence contrast learning
猜你喜欢

Mutual transformation among lists, arrays and tensors

C multithreading learning note 1

Do you understand all these difficult memory problems?

QT interface nested movement based on qscrollarea

QT 基于QScrollArea的界面嵌套移动

Design tools and skills for beginners to build their own blog

How can JMeter parameterization be implemented?

Notes on the second test of C language

Yanrong looks at how to realize the optimal storage solution of data Lake in a hybrid cloud environment

CRM对企业以及销售员有哪些帮助?
随机推荐
At the early stage of product development, do you choose to develop apps or applets?
Anaconda installs opencv (CV2) and uses it in the jupyter notebook
C multithreading learning note 4
[registration] to solve the core concerns of technology entrepreneurs, the online enrollment of "nebula plan open class" was opened
Kotlin bubbling algorithm, Gaud map filters out the data whose distance between two points is less than 50, and does not repeat the display
Beijing / Shanghai internal promotion | recruitment of full-time interns in the system and network group of Microsoft Research Asia
超强实操!手把手教学Kinect深度图与RGB摄像头的标定与配准
KaTeX问题 —— csdn编辑时中打出等号对齐的样式
1
NC | Wang Jun / song Mozhi combined with third-generation sequencing to analyze the structural variation and function of intestinal flora
Consumption mode of Message Oriented Middleware
利用 GDB 快速阅读 postgresql 的内核代码
微信小程序 返回上一页并传参
CVPR 2022 | 基于序列对比学习的长视频逐帧动作表示
[Discrete Mathematical period Review Series] Second and first order Logic (precate Logic)
2022 practice questions and online simulation test for the third batch of Guangdong Provincial Safety Officer a certificate (principal)
Does Fortran have a standard library
2022危险化学品经营单位主要负责人考试题库及在线模拟考试
[big guy show] aiops in the eyes of Borui data, choosing the right track and the right people
初试c语言之第二次笔记