当前位置:网站首页>Data Lake (13): spark and iceberg integrate DDL operations

Data Lake (13): spark and iceberg integrate DDL operations

2022-07-04 14:28:00 Lansonli

List of articles

Spark And Iceberg Integrate DDL operation

One 、​​​​​​​CREATE TABLE Create table

Two 、​​​​​​​​​​​​​​CREATE TAEBL ...  AS SELECT

1、 Create table hadoop_prod.default.mytbl, And insert data

2、 Use “create table ... as select” Syntax create table mytal2 And query  


1、 Create table “hadoop_prod.default.mytbl3”, And insert data 、 Exhibition

2、 Rebuild table “hadoop_prod.default.mytbl3”, And insert the corresponding data

Four 、​​​​​​​​​​​​​​DROP TABLE

5、 ... and 、ALTER TABLE 

6、 ... and 、ALTER TABLE Partition operation  

1、 Create table mytbl, And insert data

2、 Will table loc Columns are added as partition columns , And insert data , Inquire about

3、 take ts Columns are converted as partition columns , Insert data and query

4、 Delete partition loc

5、 Delete partition years(ts)

Spark And Iceberg Integrate DDL operation

Use here Hadoop Catalog To demonstrate Spark And Iceberg Of DDL operation .

One 、​​​​​​​CREATE TABLE Create table

Create table establish Iceberg surface , Creating tables can not only create ordinary tables, but also create partitioned tables , When inserting a batch of data into the partition table , The partitioned columns in the data must be sorted , Otherwise, a file closing error will appear , The code is as follows :

val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
  // Appoint hadoop catalog,catalog The name is hadoop_prod
  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")

// Create a normal table 
    | create table if not exists hadoop_prod.default.normal_tbl(id int,name string,age int) using iceberg

// Create a partition table , With  loc  Column as partition field 
    |create table if not exists hadoop_prod.default.partition_tbl(id int,name string,age int,loc string) using iceberg partitioned by (loc)

// When inserting data into a partitioned table , Partition columns must be sorted , Otherwise, the report will be wrong :java.lang.IllegalStateException: Already closed files for partition:xxx
    |insert into table hadoop_prod.default.partition_tbl values (1,"zs",18,"beijing"),(3,"ww",20,"beijing"),(2,"ls",19,"shanghai"),(4,"ml",21,"shagnhai")
spark.sql("select * from hadoop_prod.default.partition_tbl").show()

  The query results are as follows :

establish Iceberg In zoning , You can also use some conversion expressions to timestamp Column to convert , establish Hide partition , Common conversion expressions are as follows :

  • years(ts): By year
// Create a partition table  partition_tbl1 , Specify partition as year
    |create table if not exists hadoop_prod.default.partition_tbl1(id int ,name string,age int,regist_ts timestamp) using iceberg
    |partitioned by (years(regist_ts))

// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together 
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
    |insert into hadoop_prod.default.partition_tbl1 values
    |(1,'zs',18,cast(1608469830 as timestamp)),
    |(3,'ww',20,cast(1603096230 as timestamp)),
    |(5,'tq',22,cast(1608279630 as timestamp)),
    |(2,'ls',19,cast(1634559630 as timestamp)),
    |(4,'ml',21,cast(1639920630 as timestamp)),
    |(6,'gb',23,cast(1576843830 as timestamp))

// Query results 
    |select * from hadoop_prod.default.partition_tbl1

  The results are as follows :

stay HDFS Middle is divided by year

  • months(ts): according to “ year - month ” Monthly level partition
// Create a partition table  partition_tbl2 , Specify partition as months, According to “ year - month ” Partition 
    |create table if not exists hadoop_prod.default.partition_tbl2(id int ,name string,age int,regist_ts timestamp) using iceberg
    |partitioned by (months(regist_ts))

// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together 
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
    |insert into hadoop_prod.default.partition_tbl2 values
    |(1,'zs',18,cast(1608469830 as timestamp)),
    |(5,'tq',22,cast(1608279630 as timestamp)),
    |(2,'ls',19,cast(1634559630 as timestamp)),
    |(3,'ww',20,cast(1603096230 as timestamp)),
    |(4,'ml',21,cast(1639920630 as timestamp)),
    |(6,'gb',23,cast(1576843830 as timestamp))

// Query results 
    |select * from hadoop_prod.default.partition_tbl2

The results are as follows :


  stay HDFS Is in accordance with the “ year - month ” partition

  • days(ts) perhaps date(ts): according to “ year - month - Japan ” Day level partition
// Create a partition table  partition_tbl3 , Specify partition as  days, According to “ year - month - Japan ” Partition 
    |create table if not exists hadoop_prod.default.partition_tbl3(id int ,name string,age int,regist_ts timestamp) using iceberg
    |partitioned by (days(regist_ts))

// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together 
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
    |insert into hadoop_prod.default.partition_tbl3 values
    |(1,'zs',18,cast(1608469830 as timestamp)),
    |(5,'tq',22,cast(1608279630 as timestamp)),
    |(2,'ls',19,cast(1634559630 as timestamp)),
    |(3,'ww',20,cast(1603096230 as timestamp)),
    |(4,'ml',21,cast(1639920630 as timestamp)),
    |(6,'gb',23,cast(1576843830 as timestamp))

// Query results 
    |select * from hadoop_prod.default.partition_tbl3

The results are as follows :

  stay HDFS Is in accordance with the “ year - month - Japan ” partition


  • hours(ts) perhaps date_hour(ts): according to “ year - month - Japan - when ” Hour level partition
// Create a partition table  partition_tbl4 , Specify partition as  hours, According to “ year - month - Japan - when ” Partition 
    |create table if not exists hadoop_prod.default.partition_tbl4(id int ,name string,age int,regist_ts timestamp) using iceberg
    |partitioned by (hours(regist_ts))

// Insert data into table , Be careful , The inserted data needs to be sorted in advance , It has to be sorted , As long as the data of the same date are written together 
//(1,'zs',18,1608469830) --"2020-12-20 21:10:30"
//(2,'ls',19,1634559630) --"2021-10-18 20:20:30"
//(3,'ww',20,1603096230) --"2020-10-19 16:30:30"
//(4,'ml',21,1639920630) --"2021-12-19 21:30:30"
//(5,'tq',22,1608279630) --"2020-12-18 16:20:30"
//(6,'gb',23,1576843830) --"2019-12-20 20:10:30"
    |insert into hadoop_prod.default.partition_tbl4 values
    |(1,'zs',18,cast(1608469830 as timestamp)),
    |(5,'tq',22,cast(1608279630 as timestamp)),
    |(2,'ls',19,cast(1634559630 as timestamp)),
    |(3,'ww',20,cast(1603096230 as timestamp)),
    |(4,'ml',21,cast(1639920630 as timestamp)),
    |(6,'gb',23,cast(1576843830 as timestamp))

// Query results 
    |select * from hadoop_prod.default.partition_tbl4

  The results are as follows :

  stay HDFS Is in accordance with the “ year - month - Japan - when ” partition

Iceberg The supported time partitions currently and in the future only support UTC,UTC It's international time ,UTC+8 It's international time plus eight hours , It's East eighth District time , It's Beijing time , So we can see that the partition time above is inconsistent with the data time .

In addition to the above commonly used time hidden partitions ,Iceberg And support bucket(N,col) Partition , This partitioning method can be based on hash Value and N The remainder determines the partition to which the data goes .truncate(L,col), This hidden partition can intercept the string of characters L length , The same data will be divided into the same partition .

Two 、​​​​​​​​​​​​​​CREATE TAEBL ...  AS SELECT

Iceberg Support “create table .... as select ” grammar , You can create a table from the query statement , And insert the corresponding data , The operation is as follows :

1、 Create table hadoop_prod.default.mytbl, And insert data

val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
  // Appoint hadoop catalog,catalog The name is hadoop_prod
  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")

// Create a normal table 
    | create table hadoop_prod.default.mytbl(id int,name string,age int) using iceberg

// Insert data into table 
    |insert into table hadoop_prod.default.mytbl values (1,"zs",18),(3,"ww",20),(2,"ls",19),(4,"ml",21)

// Query data 
spark.sql("select * from hadoop_prod.default.mytbl").show()

2、 Use “create table ... as select” Syntax create table mytal2 And query  

    |create table hadoop_prod.default.mytbl2 using iceberg as select id,name,age from hadoop_prod.default.mytbl
    |select * from hadoop_prod.default.mytbl2

give the result as follows :



Iceberg Support “replace table .... as select ” grammar , You can rebuild a table from the query statement , And insert the corresponding data , The operation is as follows :

1、 Create table “hadoop_prod.default.mytbl3”, And insert data 、 Exhibition

    |create table hadoop_prod.default.mytbl3 (id int,name string,loc string,score int) using iceberg
    |insert into table hadoop_prod.default.mytbl3 values (1,"zs","beijing",100),(2,"ls","shanghai",200)
    |select * from hadoop_prod.default.mytbl3



2、 Rebuild table “hadoop_prod.default.mytbl3”, And insert the corresponding data

    |replace table hadoop_prod.default.mytbl2 using iceberg as select * from hadoop_prod.default.mytbl3

    |select * from hadoop_prod.default.mytbl2


Four 、​​​​​​​​​​​​​​DROP TABLE

Delete iceberg Table is executed directly :“drop table xxx” Sentence can be used , When deleting a table , The table data will be deleted , But the library directory exists .

// Delete table 
    |drop table hadoop_prod.default.mytbl


5、 ... and 、ALTER TABLE 

Iceberg Of alter Operation in Spark3.x Version supports ,alter It generally includes the following operations :

  • add to 、 Delete column

Add column operation :ALTER TABLE ... ADD COLUMN

Delete column operation :ALTER TABLE ... DROP COLUMN

//1. Create table test, And insert data 、 Inquire about 
    |create table hadoop_prod.default.test(id int,name string,age int) using iceberg
    |insert into table hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
    | select * from hadoop_prod.default.test

//2. Add fields , to  test Table increase  gender  Column 、loc Column 
    |alter table hadoop_prod.default.test add column gender string,loc string

//3. Delete field , to test  Table delete age  Column 
    |alter table hadoop_prod.default.test drop column age

//4. See the table test data 
    |select * from hadoop_prod.default.test

The final table shows fewer columns age Column , More gender、loc Column :


  • To be ranked high

Rename the syntax :ALTER TABLE ... RENAME COLUMN, The operation is as follows :

//5. To be ranked high 
    |alter table hadoop_prod.default.test rename column gender to xxx
    |select * from hadoop_prod.default.test

The columns shown in the final table  gender The column becomes xxx Column



6、 ... and 、ALTER TABLE Partition operation  

alter Partition operations include adding partitions and deleting partitions , This partition operation is in Spark3.x Later supported ,spark2.4 Version not supported , And when you use it , Must be in spark Add... To the configuration spark.sql.extensions attribute , Its value is :org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, Partition conversion is also supported when adding partitions , The grammar is as follows :

  • Add partition syntax :ALTER TABLE ... ADD PARTITION FIELD
  • Delete partition Syntax :ALTER TABLE ... DROP PARTITION FIELD

The specific operation is as follows :

1、 Create table mytbl, And insert data

val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
  // Appoint hadoop catalog,catalog The name is hadoop_prod
  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")

//1. Create a normal table 
    | create table hadoop_prod.default.mytbl(id int,name string,loc string,ts timestamp) using iceberg
//2. Insert data into table , And query 
    |insert into hadoop_prod.default.mytbl values
    |(1,'zs',"beijing",cast(1608469830 as timestamp)),
    |(3,'ww',"shanghai",cast(1603096230 as timestamp))
spark.sql("select * from hadoop_prod.default.mytbl").show()

stay HDFS The data storage and results in are as follows :



2、 Will table loc Columns are added as partition columns , And insert data , Inquire about

//3. take  loc  Columns are added as partitions , You must add  config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")  To configure 
    |alter table hadoop_prod.default.mytbl add partition field loc

//4. To watch  mytbl Continue inserting data in , Previously, the data was not partitioned , After that, the data is partitioned 
    |insert into hadoop_prod.default.mytbl values
    |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
    |(2,'ls',"shandong",cast(1634559630 as timestamp))
  """.stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()

  stay HDFS The data storage and results in are as follows :


  Be careful : Adding partition fields is a metadata operation , The existing table data will not be changed , New data will be written using the new partition , Existing data will remain in the original layout .

3、 take ts Columns are converted as partition columns , Insert data and query

//5. take  ts  Columns are added as partition columns by partition transformation 
    |alter table hadoop_prod.default.mytbl add partition field years(ts)

//6. To watch  mytbl Continue inserting data in , Previously, the data was not partitioned , After that, the data is partitioned 
    |insert into hadoop_prod.default.mytbl values
    |(4,'ml',"beijing",cast(1639920630 as timestamp)),
    |(6,'gb',"tianjin",cast(1576843830 as timestamp))
  """.stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()

stay HDFS The data storage and results in are as follows :


4、 Delete partition loc

//7. Delete table  mytbl  Medium loc Partition 
    |alter table hadoop_prod.default.mytbl drop partition field loc
//8. Go on to the table  mytbl  Insert data , And query 
    |insert into hadoop_prod.default.mytbl values
    |(4,'ml',"beijing",cast(1639920630 as timestamp)),
    |(6,'gb',"tianjin",cast(1576843830 as timestamp))
  """.stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()

  stay HDFS The data storage and results in are as follows :

Be careful : Because there are ts The corresponding partition after partition conversion , So continue inserting data loc Zoning null


5、 Delete partition years(ts)

//9. Delete table  mytbl  Medium years(ts)  Partition 
    |alter table hadoop_prod.default.mytbl drop partition field years(ts)
//10. Go on to the table  mytbl  Insert data , And query 
    |insert into hadoop_prod.default.mytbl values
    |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
    |(2,'ls',"shandong",cast(1634559630 as timestamp))
  """.stripMargin )
spark.sql("select * from hadoop_prod.default.mytbl").show()

stay HDFS The data storage and results in are as follows :


  • Blog home page :https://lansonli.blog.csdn.net
  • Welcome to thumb up Collection Leaving a message. Please correct any mistakes !
  • This paper is written by Lansonli original , First appeared in CSDN Blog
  • When you stop to rest, don't forget that others are still running , I hope you will seize the time to learn , Go all out for a better life

