当前位置:网站首页>Data Lake (VI): Hudi and Flink integration
Data Lake (VI): Hudi and Flink integration
2022-06-10 14:41:00 【Lanson】
Hudi And Flink Integrate
Hudi0.8.0 Version and Flink1.12.x Compatible with previous versions , Currently tested ,Hudi0.8.0 Version starting support Flink, adopt Flink Write data to the Hudi when , Must be turned on checkpoint, There are at least 5 Time checkpoint Then you can see the corresponding hudi Data in .
But there should be some problems , The current problems are as follows :
Execute locally Flink Code to Flink When writing data , There is “java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract” error message , Expected to be hudi Version support issues . Write to Flink Data in , If you use Flink Read out , There will be corresponding errors :“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”, This error is mainly caused by the previous error Hudi There is no commit Information , When reading internally , Cannot read Commit Information leads to .
One 、maven pom.xml Import the following packages
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
</properties>
<dependencies>
<!-- Flink operation Hudi Need to pack -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- java Development Flink The required depend on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Development Scala The following dependencies need to be imported -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Read hdfs Documentation needs jar package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<!-- Flink State management RocksDB rely on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.12.1</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL Use in Blink Packages that need to be imported -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Two 、Flink Write data to Hudi Code
//1. Create objects
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build())
import org.apache.flink.streaming.api.scala._
//2. Must be turned on checkpoint The default is 5 individual checkpoint after ,hudi There will be data in the directory , Otherwise there is only one .hoodie Catalog .
env.enableCheckpointing(2000)
// env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))
//3. Set parallelism
env.setParallelism(1)
//4. Read Kakfa Data in
tableEnv.executeSql(
"""
| create table kafkaInputTable(
| id varchar,
| name varchar,
| age int,
| ts varchar,
| loc varchar
| ) with (
| 'connector' = 'kafka',
| 'topic' = 'test_tp',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='latest-offset',
| 'properties.group.id' = 'testgroup',
| 'format' = 'csv'
| )
""".stripMargin)
val table: Table = tableEnv.from("kafkaInputTable")
//5. establish Flink Corresponding hudi surface
tableEnv.executeSql(
"""
|CREATE TABLE t1(
| id VARCHAR(20) PRIMARY KEY NOT ENFORCED,-- The default primary key column is uuid, Here you can follow “PRIMARY KEY NOT ENFORCED” Specify as primary key column
| name VARCHAR(10),
| age INT,
| ts VARCHAR(20),
| loc VARCHAR(20)
|)
|PARTITIONED BY (loc)
|WITH (
| 'connector' = 'hudi',
| 'path' = '/flink_hudi_data',
| 'write.tasks' = '1', -- default is 4 ,required more resource
| 'compaction.tasks' = '1', -- default is 10 ,required more resource
| 'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
|)
""".stripMargin)
//6. Insert data into table
tableEnv.executeSql(
s"""
| insert into t1 select id,name,age,ts,loc from ${table}
""".stripMargin)
env.execute()
The above code needs attention “PRIMARY KEY NOT ENFORCED” You don't specify , If you don't specify hudi The corresponding primary key column defaults to “uuid”, After specifying, you can use the customized column name as the primary key .
边栏推荐
- [logodetection dataset processing] (4) extract the logo area of each picture
- QT transfers the received JSON data (including Chinese) Unicode to utf8
- C multithreading learning note 3
- 【LogoDetection 数据集处理】(1)将数据集切分为训练集和验证集
- 2022危险化学品经营单位主要负责人考试题库及在线模拟考试
- 【离散数学期复习系列】六、树
- 2022 practice questions and online simulation test for the third batch of Guangdong Provincial Safety Officer a certificate (principal)
- 远程监控及数据采集解决方案
- golang使用反射将一个结构体的数据直接复制到另一个结构体中(通过相同字段)
- Ue5 Comment convertir les coordonnées de l'écran en coordonnées du monde et en direction du monde
猜你喜欢

2022 practice questions and online simulation test for the third batch of Guangdong Provincial Safety Officer a certificate (principal)

KAtex problem - the style of equal sign alignment in CSDN editing

远程监控及数据采集解决方案

AUTOCAD——设置文字间距与行距

超强实操!手把手教学Kinect深度图与RGB摄像头的标定与配准

2022第十五届南京国际数字化工业博览会

Do you understand all these difficult memory problems?

CVPR 2022 Oral | SCI:实现快速、灵活与稳健的低光照图像增强

2022第十四届南京国际人工智能产品展会

Flutter drawer learning summary 6
随机推荐
[solution] each time the trained model is loaded, the generated vector will be different
CG碰撞检测 Collision Testing
UE5如何將屏幕坐標轉為世界坐標和世界方向
C # list of shallow copy and deep copy customized classes <>
[logodetection dataset processing] (4) extract the logo area of each picture
[special introduction] round table forum -- the integration of AI and audio and video technology
LeetCode_20(括号匹配)
NC | Wang Jun / song Mozhi combined with third-generation sequencing to analyze the structural variation and function of intestinal flora
超强实操!手把手教学Kinect深度图与RGB摄像头的标定与配准
UE5如何将屏幕坐标转为世界坐标和世界方向
【报名】解决科技创业者核心关切,「星云计划公开课」线上招生开启
Singleton pattern and special class design
2022 practice questions and online simulation test for the third batch of Guangdong Provincial Safety Officer a certificate (principal)
[cloud native | kubernetes] in depth RC, RS, daemonset, statefulset (VII)
As a programmer, is it really that important for the underlying principles?
jdbc对数据库增删改查
How to implement the association between interfaces in JMeter?
Mutual transformation among lists, arrays and tensors
Generate a dataset of training vectors for doc2vec
[discrete mathematics review series] v. some special charts