当前位置:网站首页>Data Lake (13): spark and iceberg integrate DDL operations
Data Lake (13): spark and iceberg integrate DDL operations
2022-07-04 14:28:00 【Lansonli】
List of articles
Spark And Iceberg Integrate DDL operation
One 、CREATE TABLE Create table
Two 、CREATE TAEBL ... AS SELECT
1、 Create table hadoop_prod.default.mytbl, And insert data
2、 Use “create table ... as select” Syntax create table mytal2 And query
3、 ... and 、REPLACE TABLE ... AS SELECT
1、 Create table “hadoop_prod.default.mytbl3”, And insert data 、 Exhibition
2、 Rebuild table “hadoop_prod.default.mytbl3”, And insert the corresponding data
Four 、DROP TABLE
6、 ... and 、ALTER TABLE Partition operation
1、 Create table mytbl, And insert data
2、 Will table loc Columns are added as partition columns , And insert data , Inquire about
3、 take ts Columns are converted as partition columns , Insert data and query
Spark And Iceberg Integrate DDL operation
Use here Hadoop Catalog To demonstrate Spark And Iceberg Of DDL operation .
One 、CREATE TABLE Create table
Create table establish Iceberg surface , Creating tables can not only create ordinary tables, but also create partitioned tables , When inserting a batch of data into the partition table , The partitioned columns in the data must be sorted , Otherwise, a file closing error will appear , The code is as follows :
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
// Appoint hadoop catalog,catalog The name is hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
// Create a normal table
spark.sql(
"""
| create table if not exists hadoop_prod.default.normal_tbl(id int,name string,age int) using iceberg
""".stripMargin)
// Create a partition table , With loc Column as partition field
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl(id int,name string,age int,loc string) using iceberg partitioned by (loc)
""".stripMargin)
// When inserting data into a partitioned table , Partition columns must be sorted , Otherwise, the report will be wrong :java.lang.IllegalStateException: Already closed files for partition:xxx
spark.sql(
"""
|insert into table hadoop_prod.default.partition_tbl values (1,"zs",18,"beijing"),(3,"ww",20,"beijing"),(2,"ls",19,"shanghai"),(4,"ml",21,"shagnhai")
""".stripMargin)
spark.sql("select * from hadoop_prod.default.partition_tbl").show()
The query results are as follows :
establish Iceberg In zoning , You can also use some conversion expressions to timestamp Column to convert , establish Hide partition , Common conversion expressions are as follows :
- years(ts): By year
// Create a partition table partition_tbl1 , Specify partition as year
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl1(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (years(regist_ts))
""".stripMargin)
// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl1 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// Query results
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl1
""".stripMargin).show()
The results are as follows :
stay HDFS Middle is divided by year :
- months(ts): according to “ year - month ” Monthly level partition
// Create a partition table partition_tbl2 , Specify partition as months, According to “ year - month ” Partition
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl2(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (months(regist_ts))
""".stripMargin)
// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl2 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// Query results
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl2
""".stripMargin).show()
The results are as follows :
stay HDFS Is in accordance with the “ year - month ” partition :
- days(ts) perhaps date(ts): according to “ year - month - Japan ” Day level partition
// Create a partition table partition_tbl3 , Specify partition as days, According to “ year - month - Japan ” Partition
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl3(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (days(regist_ts))
""".stripMargin)
// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl3 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// Query results
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl3
""".stripMargin).show()
The results are as follows :
stay HDFS Is in accordance with the “ year - month - Japan ” partition :
- hours(ts) perhaps date_hour(ts): according to “ year - month - Japan - when ” Hour level partition
// Create a partition table partition_tbl4 , Specify partition as hours, According to “ year - month - Japan - when ” Partition
spark.sql(
"""
|create table if not exists hadoop_prod.default.partition_tbl4(id int ,name string,age int,regist_ts timestamp) using iceberg
|partitioned by (hours(regist_ts))
""".stripMargin)
// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
spark.sql(
"""
|insert into hadoop_prod.default.partition_tbl4 values
|(1,'zs',18,cast(1608469830 as timestamp)),
|(5,'tq',22,cast(1608279630 as timestamp)),
|(2,'ls',19,cast(1634559630 as timestamp)),
|(3,'ww',20,cast(1603096230 as timestamp)),
|(4,'ml',21,cast(1639920630 as timestamp)),
|(6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// Query results
spark.sql(
"""
|select * from hadoop_prod.default.partition_tbl4
""".stripMargin).show()
The results are as follows :
stay HDFS Is in accordance with the “ year - month - Japan - when ” partition :
Iceberg The supported time partitions currently and in the future only support UTC,UTC It's international time ,UTC+8 It's international time plus eight hours , It's East eighth District time , It's Beijing time , So we can see that the partition time above is inconsistent with the data time .
In addition to the above commonly used time hidden partitions ,Iceberg And support bucket(N,col) Partition , This partitioning method can be based on hash Value and N The remainder determines the partition to which the data goes .truncate(L,col), This hidden partition can intercept the string of characters L length , The same data will be divided into the same partition .
Two 、CREATE TAEBL ... AS SELECT
Iceberg Support “create table .... as select ” grammar , You can create a table from the query statement , And insert the corresponding data , The operation is as follows :
1、 Create table hadoop_prod.default.mytbl, And insert data
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
// Appoint hadoop catalog,catalog The name is hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
// Create a normal table
spark.sql(
"""
| create table hadoop_prod.default.mytbl(id int,name string,age int) using iceberg
""".stripMargin)
// Insert data into table
spark.sql(
"""
|insert into table hadoop_prod.default.mytbl values (1,"zs",18),(3,"ww",20),(2,"ls",19),(4,"ml",21)
""".stripMargin)
// Query data
spark.sql("select * from hadoop_prod.default.mytbl").show()
2、 Use “create table ... as select” Syntax create table mytal2 And query
spark.sql(
"""
|create table hadoop_prod.default.mytbl2 using iceberg as select id,name,age from hadoop_prod.default.mytbl
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.mytbl2
""".stripMargin).show()
give the result as follows :
3、 ... and 、REPLACE TABLE ... AS SELECT
Iceberg Support “replace table .... as select ” grammar , You can rebuild a table from the query statement , And insert the corresponding data , The operation is as follows :
1、 Create table “hadoop_prod.default.mytbl3”, And insert data 、 Exhibition
spark.sql(
"""
|create table hadoop_prod.default.mytbl3 (id int,name string,loc string,score int) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into table hadoop_prod.default.mytbl3 values (1,"zs","beijing",100),(2,"ls","shanghai",200)
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.mytbl3
""".stripMargin).show
2、 Rebuild table “hadoop_prod.default.mytbl3”, And insert the corresponding data
spark.sql(
"""
|replace table hadoop_prod.default.mytbl2 using iceberg as select * from hadoop_prod.default.mytbl3
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.mytbl2
""".stripMargin).show()
Four 、DROP TABLE
Delete iceberg Table is executed directly :“drop table xxx” Sentence can be used , When deleting a table , The table data will be deleted , But the library directory exists .
// Delete table
spark.sql(
"""
|drop table hadoop_prod.default.mytbl
""".stripMargin)
5、 ... and 、ALTER TABLE
Iceberg Of alter Operation in Spark3.x Version supports ,alter It generally includes the following operations :
- add to 、 Delete column
Add column operation :ALTER TABLE ... ADD COLUMN
Delete column operation :ALTER TABLE ... DROP COLUMN
//1. Create table test, And insert data 、 Inquire about
spark.sql(
"""
|create table hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
spark.sql(
"""
|insert into table hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
spark.sql(
"""
| select * from hadoop_prod.default.test
""".stripMargin).show()
//2. Add fields , to test Table increase gender Column 、loc Column
spark.sql(
"""
|alter table hadoop_prod.default.test add column gender string,loc string
""".stripMargin)
//3. Delete field , to test Table delete age Column
spark.sql(
"""
|alter table hadoop_prod.default.test drop column age
""".stripMargin)
//4. See the table test data
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
The final table shows fewer columns age Column , More gender、loc Column :
- To be ranked high
Rename the syntax :ALTER TABLE ... RENAME COLUMN, The operation is as follows :
//5. To be ranked high
spark.sql(
"""
|alter table hadoop_prod.default.test rename column gender to xxx
|
""".stripMargin)
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
The columns shown in the final table gender The column becomes xxx Column :
6、 ... and 、ALTER TABLE Partition operation
alter Partition operations include adding partitions and deleting partitions , This partition operation is in Spark3.x Later supported ,spark2.4 Version not supported , And when you use it , Must be in spark Add... To the configuration spark.sql.extensions attribute , Its value is :org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, Partition conversion is also supported when adding partitions , The grammar is as follows :
- Add partition syntax :ALTER TABLE ... ADD PARTITION FIELD
- Delete partition Syntax :ALTER TABLE ... DROP PARTITION FIELD
The specific operation is as follows :
1、 Create table mytbl, And insert data
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
// Appoint hadoop catalog,catalog The name is hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
//1. Create a normal table
spark.sql(
"""
| create table hadoop_prod.default.mytbl(id int,name string,loc string,ts timestamp) using iceberg
""".stripMargin)
//2. Insert data into table , And query
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(1,'zs',"beijing",cast(1608469830 as timestamp)),
|(3,'ww',"shanghai",cast(1603096230 as timestamp))
""".stripMargin)
spark.sql("select * from hadoop_prod.default.mytbl").show()
stay HDFS The data storage and results in are as follows :
2、 Will table loc Columns are added as partition columns , And insert data , Inquire about
//3. take loc Columns are added as partitions , You must add config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") To configure
spark.sql(
"""
|alter table hadoop_prod.default.mytbl add partition field loc
""".stripMargin)
//4. To watch mytbl Continue inserting data in , Previously, the data was not partitioned , After that, the data is partitioned
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
|(2,'ls',"shandong",cast(1634559630 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
stay HDFS The data storage and results in are as follows :
Be careful : Adding partition fields is a metadata operation , The existing table data will not be changed , New data will be written using the new partition , Existing data will remain in the original layout .
3、 take ts Columns are converted as partition columns , Insert data and query
//5. take ts Columns are added as partition columns by partition transformation
spark.sql(
"""
|alter table hadoop_prod.default.mytbl add partition field years(ts)
""".stripMargin)
//6. To watch mytbl Continue inserting data in , Previously, the data was not partitioned , After that, the data is partitioned
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(4,'ml',"beijing",cast(1639920630 as timestamp)),
|(6,'gb',"tianjin",cast(1576843830 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
stay HDFS The data storage and results in are as follows :
4、 Delete partition loc
//7. Delete table mytbl Medium loc Partition
spark.sql(
"""
|alter table hadoop_prod.default.mytbl drop partition field loc
""".stripMargin)
//8. Go on to the table mytbl Insert data , And query
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(4,'ml',"beijing",cast(1639920630 as timestamp)),
|(6,'gb',"tianjin",cast(1576843830 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
stay HDFS The data storage and results in are as follows :
Be careful : Because there are ts The corresponding partition after partition conversion , So continue inserting data loc Zoning null
5、 Delete partition years(ts)
//9. Delete table mytbl Medium years(ts) Partition
spark.sql(
"""
|alter table hadoop_prod.default.mytbl drop partition field years(ts)
""".stripMargin)
//10. Go on to the table mytbl Insert data , And query
spark.sql(
"""
|insert into hadoop_prod.default.mytbl values
|(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
|(2,'ls',"shandong",cast(1634559630 as timestamp))
""".stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()
stay HDFS The data storage and results in are as follows :
- Blog home page :https://lansonli.blog.csdn.net
- Welcome to thumb up Collection Leaving a message. Please correct any mistakes !
- This paper is written by Lansonli original , First appeared in CSDN Blog
- When you stop to rest, don't forget that others are still running , I hope you will seize the time to learn , Go all out for a better life
边栏推荐
- 去除重複字母[貪心+單調棧(用數組+len來維持單調序列)]
- 卷积神经网络经典论文集合(深度学习分类篇)
- 实时数据仓库
- Practical puzzle solving | how to extract irregular ROI regions in opencv
- Leetcode T48: rotating images
- Matters needing attention in overseas game Investment Agency
- Intelligence d'affaires bi analyse financière, analyse financière au sens étroit et analyse financière au sens large sont - ils différents?
- R语言dplyr包summarise_if函数计算dataframe数据中所有数值数据列的均值和中位数、基于条件进行数据汇总分析(Summarize all Numeric Variables)
- ML之shap:基于boston波士顿房价回归预测数据集利用shap值对XGBoost模型实现可解释性案例
- 失败率高达80%,企业数字化转型路上有哪些挑战?
猜你喜欢
Test evaluation of software testing
【MySQL从入门到精通】【高级篇】(四)MySQL权限管理与控制
leetcode:6109. 知道秘密的人数【dp的定义】
nowcoder重排链表
Real time data warehouse
Oppo find N2 product form first exposure: supplement all short boards
使用CLion编译OGLPG-9th-Edition源码
Use of tiledlayout function in MATLAB
Data center concept
C# wpf 实现截屏框实时截屏功能
随机推荐
LiveData
How to operate and invest games on behalf of others at sea
AI与生命科学
[MySQL from introduction to proficiency] [advanced chapter] (V) SQL statement execution process of MySQL
为什么图片传输要使用base64编码
STM32F1与STM32CubeIDE编程实例-MAX7219驱动8位7段数码管(基于GPIO)
Ruiji takeout notes
Vscode common plug-ins summary
关于miui12.5 红米k20pro用au或者povo2出现问题的解决办法
Popular framework: the use of glide
One architecture to complete all tasks - transformer architecture is unifying the AI Jianghu on its own
MySQL triggers
卷积神经网络经典论文集合(深度学习分类篇)
leetcode:6110. 网格图中递增路径的数目【dfs + cache】
R语言使用dplyr包的mutate函数对指定数据列进行标准化处理(使用mean函数和sd函数)并基于分组变量计算标准化后的目标变量的分组均值
Data center concept
去除重复字母[贪心+单调栈(用数组+len来维持单调序列)]
Error in find command: paths must precede expression (turn)
使用CLion编译OGLPG-9th-Edition源码
Sqlserver functions, creation and use of stored procedures