当前位置:网站首页>Data Lake (XII): integration of spark3.1.2 and iceberg0.12.1
Data Lake (XII): integration of spark3.1.2 and iceberg0.12.1
2022-07-02 19:38:00 【Lansonli】
List of articles
Spark3.1.2 And Iceberg0.12.1 Integrate
One 、 towards pom File import depends on
Two 、SparkSQL Set up catalog To configure
3、 ... and 、 Use Hive Catalog management Iceberg surface
Four 、 use Hadoop Catalog management Iceberg surface
4、 Create the corresponding Hive Table mapping data
Spark3.1.2 And Iceberg0.12.1 Integrate
Spark Can operate Iceberg Data Lake , the Iceberg The version is 0.12.1, This version is similar to Spark2.4 Version is compatible . Because in Spark2.4 Operation in version Iceberg Don't support DDL、 Add partition and partition conversion 、Iceberg Metadata query 、insert into/overwrite Wait for the operation , It is recommended to use Spark3.x Version to integrate Iceberg0.12.1 edition , Here we use Spark The version is 3.1.2 edition .
One 、 towards pom File import depends on
stay Idea Created in Maven project , stay pom Import the following key dependencies into the file :
<!-- Configure the following to solve stay jdk1.8 When packing in an environment, there is an error “-source 1.5 China does not support it. lambda expression ” -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Spark And Iceberg Integrated dependency packages -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.12.1</version>
</dependency>
<!-- avro Format Dependency package -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<!-- parquet Format Dependency package -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkSQL ON Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!–mysql Rely on the jar package –>-->
<!--<dependency>-->
<!--<groupId>mysql</groupId>-->
<!--<artifactId>mysql-connector-java</artifactId>-->
<!--<version>5.1.47</version>-->
<!--</dependency>-->
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!– towards kafka Production data requires packages –>-->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka-clients</artifactId>-->
<!--<version>0.10.0.0</version>-->
<!--<!– Compile and test using jar package , No transitivity –>-->
<!--<!–<scope>provided</scope>–>-->
<!--</dependency>-->
<!-- StructStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Scala package -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>Two 、SparkSQL Set up catalog To configure
The following operations are mainly SparkSQL operation Iceberg, Again Spark Two are supported in Catalog Set up :hive and hadoop,Hive Catalog Namely iceberg Table storage use Hive Default data path ,Hadoop Catalog You need to specify the Iceberg Format table storage path .
stay SparkSQL The code specifies the Catalog:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
// Appoint hive catalog, catalog The name is hive_prod
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
.config("iceberg.engine.hive.enabled", "true")
// Appoint hadoop catalog,catalog The name is hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()3、 ... and 、 Use Hive Catalog management Iceberg surface
Use Hive Catalog management Iceberg The default data of the table is stored in Hive Corresponding Warehouse Under the table of contents , stay Hive The corresponding Iceberg surface ,SparkSQL Equivalent to Hive client , Additional Settings required “iceberg.engine.hive.enabled” The attribute is true, Otherwise, in the Hive Corresponding Iceberg Data cannot be queried in the format table .
1、 Create table
// Create table ,hive_pord: Appoint catalog name .default: Appoint Hive Libraries that exist in .test: Created iceberg Table name .
spark.sql(
"""
| create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)Be careful :
1) Create table time , The name of the table is :${catalog name }.${Hive Middle Library name }.${ Created Iceberg Format table name }
2) After the table is created , Can be in Hive The corresponding test surface , Created is Hive appearance , In the corresponding Hive warehouse The corresponding data directory can be seen under the directory .

2、 insert data
// insert data
spark.sql(
"""
|insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)3、 Query data
// Query data
spark.sql(
"""
|select * from hive_prod.default.test
""".stripMargin).show()give the result as follows :

stay Hive Corresponding test Data can also be queried in the table :
4、 Delete table
// Delete table , The data corresponding to the deleted table will not be deleted
spark.sql(
"""
|drop table hive_prod.default.test
""".stripMargin)Be careful : After deleting the table , The data will be deleted , But the table directory still exists , If the data is completely deleted , You need to delete the corresponding table directory .
Four 、 use Hadoop Catalog management Iceberg surface
Use Hadoop Catalog Management table , You need to specify the corresponding Iceberg Directory where data is stored .
1、 Create table
// Create table ,hadoop_prod: Appoint Hadoop catalog name .default: Specify the library name .test: Created iceberg Table name .
spark.sql(
"""
| create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)Be careful :
1) Create a table named :${Hadoop Catalog name }.${ Randomly defined library name }.${Iceberg Format table name }
2) Create table , Will be in hadoop_prod Create the table under the directory corresponding to the name

2、 insert data
// insert data
spark.sql(
"""
|insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)3、 Query data
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show() 
4、 Create the corresponding Hive Table mapping data
stay Hive Execute the following table creation statement in the table :
CREATE TABLE hdfs_iceberg (
id int,
name string,
age int
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');stay Hive Query in “hdfs_iceberg” The table data is as follows :

5、 Delete table
spark.sql(
"""
|drop table hadoop_prod.default.test
""".stripMargin)Be careful : Delete iceberg After the table , Data is deleted , The corresponding library directory exists .
- Blog home page :https://lansonli.blog.csdn.net
- Welcome to thumb up Collection Leaving a message. Please correct any mistakes !
- This paper is written by Lansonli original , First appeared in CSDN Blog
- When you stop to rest, don't forget that others are still running , I hope you will seize the time to learn , Go all out for a better life
边栏推荐
- Gamefi链游系统开发(NFT链游开发功能)丨NFT链游系统开发(Gamefi链游开发源码)
- 职场四象限法则:时间管理四象限与职场沟通四象限「建议收藏」
- Pytorch版本、CUDA版本与显卡驱动版本的对应关系
- Why should we build an enterprise fixed asset management system and how can enterprises strengthen fixed asset management
- Implementation of 453 ATOI function
- Virtual machine initialization script, virtual machine mutual secret key free
- Golang并发编程——goroutine、channel、sync
- Golang:[]byte to string
- AcWing 1135. 新年好 题解(最短路+搜索)
- Microservice technology - distributed global ID in high concurrency
猜你喜欢

Registration opportunity of autowiredannotationbeanpostprocessor in XML development mode

Web2.0 giants have deployed VC, and tiger Dao VC may become a shortcut to Web3

Chic Lang: completely solve the problem of markdown pictures - no need to upload pictures - no need to network - there is no lack of pictures forwarded to others

SQLite 3.39.0 发布,支持右外连接和全外连接

mysql函数

Web2.0 giants have deployed VC, and tiger Dao VC may become a shortcut to Web3

Istio部署:快速上手微服务,

数据湖(十二):Spark3.1.2与Iceberg0.12.1整合

搭建哨兵模式reids、redis从节点脱离哨兵集群

线程应用实例
随机推荐
LeetCode 0871.最低加油次数 - 类似于POJ2431丛林探险
《MongoDB入门教程》第03篇 MongoDB基本概念
PHP parser badminton reservation applet development requires online system
Binary operation
How to print mybats log plug-in using XML file
《代碼整潔之道》讀書筆記
Build a master-slave mode cluster redis
C file input operation
[pytorch learning notes] tensor
职场四象限法则:时间管理四象限与职场沟通四象限「建议收藏」
冒泡排序数组
Reading notes of code neatness
Advanced performance test series "24. Execute SQL script through JDBC"
AcWing 342. Road and route problem solving (shortest path, topological sorting)
pxe装机「建议收藏」
多端小程序开发有什么好处?覆盖百度小程序抖音小程序微信小程序开发,抢占多平台流量红利
450-深信服面经1
mysql备份后缀是什么_mysql备份还原
451-memcpy、memmove、memset的实现
AcWing 1134. 最短路计数 题解(最短路)