当前位置:网站首页>数据湖(十八):Flink与Iceberg整合SQL API操作
数据湖(十八):Flink与Iceberg整合SQL API操作
2022-07-30 11:31:00 【Lanson】
Flink与Iceberg整合SQL API操作
Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。
一、SQL API 创建Iceberg表并写入数据
1、创建新项目,导入如下maven依赖包
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
<flink.version>1.11.6</flink.version>
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencies>
<!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.11.1</version>
</dependency>
<!-- java 开发Flink 所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 读取hdfs文件需要jar包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
2、编写Flink SQL 创建Iceberg表并写入数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.使用当前Catalog
tblEnv.useCatalog("hadoop_iceberg");
//3.创建数据库
tblEnv.executeSql("create database iceberg_db");
//4.使用数据库
tblEnv.useDatabase("iceberg_db");
//5.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");
//6.写入数据到表 flink_iceberg_tbl
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");
3、在Hive中映射Iceberg表并查询
在Hive中执行如下命令创建对应的Iceberg表:
#在Hive中创建Iceberg表
CREATE TABLE flink_iceberg_tbl2 (
id int,
name string,
age int,
loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
#在Hive中查询Iceberg表中的数据
hive> select * from flink_iceberg_tbl2;
OK
3 ww 20 guangzhou
1 zs 18 beijing
2 ls 19 shanghai
二、SQL API 批量查询Iceberg表数据
Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.批量读取表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");
tableResult.print();
结果如下:

三、SQL API 实时查询Iceberg表数据
Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。
#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
#向Hive 中Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');
在控制台可以看到实时新增数据

四、SQL API指定基于快照实时增量读取数据
Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
//start-snapshot-id :快照ID
TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");
tableResult2.print();
边栏推荐
- 概率论的学习和整理--番外4: 关于各种平均数:算术平均数,几何平均数,调和平均数,以及加权平均数和平方平均数 (未完成)
- 英 文 换 行
- Scheduling of combined electric-heating system based on multi-objective two-stage stochastic programming method
- Matlab基础(1)——基础知识
- ORA-00600 [13013], [5001], [268] 问题分析及恢复
- 【ASP.NET Core】选项类的依赖注入
- 【JZ64 求1+2+3+...+n】
- 基于声信道分析的电缆隧道人员定位技术
- Get the original data API on 1688app
- 我又造了个轮子:GrpcGateway
猜你喜欢

【32. 图中的层次(图的广度优先遍历)】

嵌入式环境下并发控制与线程安全

明德扬FPGA开发板XILINX-K7核心板Kintex7 XC7K325 410T工业级

单片机工程师笔试题目归纳汇总

电压继电器SRMUVS-100VAC-2H2D

stm32 RTC闹钟唤醒低功耗模式

Reverse linked list - iterative inversion method

The battle-hardened programmer was also deceived by a fake programmer from a certain fish. The trust between programmers should be the highest, and he alone destroyed this sense of trust

LeetCode_236_Last Common Ancestor of a Binary Tree

HJY-F931A/YJ三相电压继电器
随机推荐
Matlab基础(1)——基础知识
Vim plugin GrepIt
stm32 RTC闹钟唤醒低功耗模式
云原生应用的概念和云原生应用的 15 个特征
Interviewer: Redis bloom filter and the cuckoo in the filter, how much do you know?
美团内推+校招笔试题+知识点总结
重写并自定义依赖的原生的Bean方法
Get the original data API on 1688app
mapbox-gl开发教程(十四):画圆技巧
[Cloud-Building Co-creation] Huawei Cloud and Hongmeng collaborate to cultivate innovative developers
【32. 图中的层次(图的广度优先遍历)】
TensorFlow自定义训练函数
API 网关 APISIX 在Google Cloud T2A 和 T2D 的性能测试
Difference between C# enumeration type and xaml
TensorFlow自定义训练函数
流水线上的农民:我在工厂种蔬菜
程序环境和预处理(详解)
Manage reading notes upward
C#调用explorer.exe打开指定目录
听到'演员工作比工人辛苦,吃得也不如工人好?'我笑了