当前位置:网站首页>数据湖(十五):Spark与Iceberg整合写操作
数据湖(十五):Spark与Iceberg整合写操作
2022-07-07 21:53:00 【Lansonli】
文章目录
2、使用MERGE INTO 语法向目标表更新、删除、新增数据
3、INSERT OVERWRITE
Spark与Iceberg整合写操作
一、INSERT INTO
"insert into"是向Iceberg表中插入数据,有两种语法形式:"INSERT INTO tbl VALUES (1,"zs",18),(2,"ls",19)"、"INSERT INTO tbl SELECT ...",以上两种方式比较简单,这里不再详细记录。
二、MERGE INTO
Iceberg "merge into"语法可以对表数据进行行级更新或删除,在Spark3.x版本之后支持,其原理是重写包含需要删除和更新行数据所在的data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似join关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。"merge into"语法如下:
MERGE INTO tbl t
USING (SELECT ...) s
ON t.id = s.id
WHEN MATCHED AND ... THEN DELETE //删除
WHEN MATCHED AND ... THEN UPDATE SET ... //更新
WHEN MATCHED AND ... AND ... THEN UPDATE SET ... //多条件更新
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...)//匹配不上向目标表插入数据
具体案例如下:
1、首先创建a表和b表,并插入数据
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
//创建一张表 a ,并插入数据
spark.sql(
"""
|create table hadoop_prod.default.a (id int,name string,age int) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
//创建另外一张表b ,并插入数据
spark.sql(
"""
|create table hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")
""".stripMargin)
2、使用MERGE INTO 语法向目标表更新、删除、新增数据
这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,那么将b表中的数据插入到a表中,具体操作如下:
//将表b 中与表a中相同id的数据更新到表a,表a中没有表b中有的id对应数据写入增加到表a
spark.sql(
"""
|merge into hadoop_prod.default.a t1
|using (select id,name ,age,tp from hadoop_prod.default.b) t2
|on t1.id = t2.id
|when matched and t2.tp = 'delete' then delete
|when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age
|when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)
""".stripMargin)
spark.sql("""select * from hadoop_prod.default.a """).show()
最终结果如下:
注意:更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。
3、INSERT OVERWRITE
"insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。
对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。
- 动态分区覆盖:
动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。
- 静态分区覆盖:
静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,那么相当于给Iceberg表增加了个一个分区。具体操作如下:
3.1、创建三张表
创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。
//创建 test1 分区表,并插入数据
spark.sql(
"""
|create table hadoop_prod.default.test1 (id int,name string,loc string)
|using iceberg
|partitioned by (loc)
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")
""".stripMargin)
//创建 test2 普通表,并插入数据
spark.sql(
"""
|create table hadoop_prod.default.test2 (id int,name string,loc string)
|using iceberg
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan")
""".stripMargin)
//创建 test3 普通表,并插入数据
spark.sql(
"""
|create table hadoop_prod.default.test3 (id int,name string,loc string)
|using iceberg
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou")
""".stripMargin)
3.2、使用insert overwrite 读取test3表中的数据覆盖到test2表中
//使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中
spark.sql(
"""
|insert overwrite hadoop_prod.default.test2
|select id,name,loc from hadoop_prod.default.test3
""".stripMargin)
//查询 test2 表中的数据
spark.sql(
"""
|select * from hadoop_prod.default.test2
""".stripMargin).show()
Iceberg 表 test2结果如下:
3.3、使用insert overwrite 读取test3表数据,动态分区方式覆盖到表test1
// 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1
spark.sql(
"""
|insert overwrite hadoop_prod.default.test1
|select id,name,loc from hadoop_prod.default.test3
""".stripMargin)
//查询 test1 表数据
spark.sql(
"""
|select * from hadoop_prod.default.test1
""".stripMargin).show()
Iceberg 表 test1结果如下:
3.4、静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中
这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。
//删除表test1,重新创建表test1 分区表,并插入数据
spark.sql(
"""
|drop table hadoop_prod.default.test1
""".stripMargin)
spark.sql(
"""
|create table hadoop_prod.default.test1 (id int,name string,loc string)
|using iceberg
|partitioned by (loc)
""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")
""".stripMargin)
spark.sql("select * from hadoop_prod.default.test1").show()
Iceberg 表 test1结果如下:
//注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复
spark.sql(
"""
|insert overwrite hadoop_prod.default.test1
|partition (loc = "jiangsu")
|select id,name from hadoop_prod.default.test3
""".stripMargin)
//查询 test1 表数据
spark.sql(
"""
|select * from hadoop_prod.default.test1
""".stripMargin).show()
Iceberg 表 test1结果如下:
注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。
四、DELETE FROM
Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,则Iceberg会重写受影响行所在的数据文件。具体操作如下:
//创建表 delete_tbl ,并加载数据
spark.sql(
"""
|create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg
|""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)
""".stripMargin)
//根据条件范围删除表 delete_tbl 中的数据
spark.sql(
"""
|delete from hadoop_prod.default.delete_tbl where id >3 and id <6
""".stripMargin)
spark.sql("select * from hadoop_prod.default.delete_tbl").show()
Iceberg 表 delete_tbl结果如下:
//根据条件删除表 delete_tbl 中的一条数据
spark.sql(
"""
|delete from hadoop_prod.default.delete_tbl where id = 2
""".stripMargin)
spark.sql("select * from hadoop_prod.default.delete_tbl").show()
Iceberg 表 delete_tbl结果如下:
五、UPDATE
Spark3.x+版本支持了update更新数据操作,可以根据匹配的条件进行数据更新操作。
操作如下:
//创建表 delete_tbl ,并加载数据
spark.sql(
"""
|create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg
|""".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)
""".stripMargin)
通过“update”更新表中id小于等于3的数据name列改为“zhangsan”,age列改为30,操作如下:
//更新 delete_tbl 表
spark.sql(
"""
|update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30
|where id <=3
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.update_tbl
""".stripMargin).show()
Iceberg 表 update_tbl结果如下:
六、DataFrame API 写入Iceberg表
Spark向Iceberg中写数据时不仅可以使用SQL方式,也可以使用DataFrame Api方式操作Iceberg,建议使用SQL方式操作。
DataFrame创建Iceberg表分为创建普通表和分区表,创建分区表时需要指定分区列,分区列可以是多个列。创建表的语法如下:
df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ...
df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ...
df.write(tbl).append() 相当于 INSERT INTO ...
df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...
具体操作如下:
//1.准备数据,使用DataFrame Api 写入Iceberg表及分区表
val nameJsonList = List[String](
"{\"id\":1,\"name\":\"zs\",\"age\":18,\"loc\":\"beijing\"}",
"{\"id\":2,\"name\":\"ls\",\"age\":19,\"loc\":\"shanghai\"}",
"{\"id\":3,\"name\":\"ww\",\"age\":20,\"loc\":\"beijing\"}",
"{\"id\":4,\"name\":\"ml\",\"age\":21,\"loc\":\"shanghai\"}")
import spark.implicits._
val df: DataFrame = spark.read.json(nameJsonList.toDS)
//创建普通表df_tbl1,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列
df.writeTo("hadoop_prod.default.df_tbl1").create()
//查询表 hadoop_prod.default.df_tbl1 中的数据,并查看数据存储结构
spark.read.table("hadoop_prod.default.df_tbl1").show()
Iceberg 表 df_tbl1结果如下:
Iceberg 表 df_tbl1存储如下:
//创建分区表df_tbl2,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列
df.sortWithinPartitions($"loc")//写入分区表,必须按照分区列进行排序
.writeTo("hadoop_prod.default.df_tbl2")
.partitionedBy($"loc")//这里可以指定多个列为联合分区
.create()
//查询分区表 hadoop_prod.default.df_tbl2 中的数据,并查看数据存储结构
spark.read.table("hadoop_prod.default.df_tbl2").show()
Iceberg 分区表 df_tbl2结果如下:
Iceberg 分区表 df_tbl2存储如下:
- 博客主页:https://lansonli.blog.csdn.net
- 欢迎点赞 收藏 留言 如有错误敬请指正!
- 本文由 Lansonli 原创,首发于 CSDN博客
- 停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活
边栏推荐
- Class C design questions
- Slam interview summary
- Flash download setup
- Mobile heterogeneous computing technology - GPU OpenCL programming (basic)
- P1067 [noip2009 popularity group] polynomial output (difficult, pit)
- Markdown
- Possible SQL for Oracle table lookup information
- Rock-paper-scissors
- 网上买基金安全么?
- Codeworks 5 questions per day (average 1500) - day 8
猜你喜欢
Balanced binary tree [AVL tree] - insert, delete
SAP HR reward and punishment information export
MySQL架构
C number of words, plus ¥, longest word, average value
HB 5469民用飞机机舱内部非金属材料燃烧试验方法
Interface
Pycharm basic settings latest version 2022
Map operation execution process
【路径规划】使用垂距限值法与贝塞尔优化A星路径
Anxinco EC series modules are connected to the multi protocol access products of onenet Internet of things open platform
随机推荐
Extract the file name under the folder under win
8.31 Tencent interview
【路径规划】使用垂距限值法与贝塞尔优化A星路径
C语言学习
StringUtils工具类
2022 certified surveyors are still at a loss when preparing for the exam? Teach you how to take the exam hand in hand?
HB 5469民用飞机机舱内部非金属材料燃烧试验方法
95.(cesium篇)cesium动态单体化-3D建筑物(楼栋)
SAP HR family member information
SAP HR social work experience 0023
Live server usage
P5594 [xr-4] simulation match
Chisel tutorial - 05 Sequential logic in chisel (including explicit multi clock, explicit synchronous reset and explicit asynchronous reset)
Where are you going
Download AWS toolkit pycharm
How can we make money by making video clips from our media?
【7.4】25. K 个一组翻转链表
[stm32+esp8266 connects to Tencent cloud IOT development platform 3] stm32+esp8266-01s dynamically registers devices on Tencent cloud (at instruction mode) -- with source code
HDU - 1260 tickets (linear DP)
Balanced binary tree [AVL tree] - insert, delete