当前位置:网站首页>Using Hudi in idea
Using Hudi in idea
2022-07-03 09:21:00 【Did Xiao Hu get stronger today】
Environmental preparation
- establish Maven project
- Create a server remote connection
Tools------Delployment-----Browse Remote Host
Set the following :
Enter the account and password of the server here
Click on Test Connection, Tips Successfully Words , The configuration is successful . - Copy Hadoop Of core-site.xml、hdfs-site.xml as well as log4j.properties Three files copied to resources Under the folder .
Set up log4j.properties For printing warning exception information :
log4j.rootCategory=WARN, console
- add to pom.xml file
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<!-- rely on Scala Language -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core rely on -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL rely on -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client rely on -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hudi-spark3 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven Compiled plug-ins -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
To comment out the following code generated when creating the project , Otherwise, the dependency keeps reporting errors :
<!-- <properties>-->
<!-- <maven.compiler.source>8</maven.compiler.source>-->
<!-- <maven.compiler.target>8</maven.compiler.target>-->
<!-- </properties>-->
The code structure :
Core code
import org.apache.hudi.QuickstartUtils.DataGenerator
import org.apache.spark.sql.{
DataFrame, SaveMode, SparkSession}
/** * Hudi Framework of data Lake , be based on Spark Calculation engine , On data CURD operation , Use the taxi travel data generated by the official simulation * * Task a : Analog data , Insert Hudi surface , use COW Pattern * Task 2 : Snapshot query (Snapshot Query) data , use DSL The way * Task three : to update (Update) data * Task 4 : Incremental query (Incremental Query) data , use SQL The way * Task five : Delete (Delete) data */
object HudiSparkDemo {
/** * Official case : Simulation generates data , Insert Hudi surface , The type of the table is COW */
def insertData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
val dataGen: DataGenerator = new DataGenerator()
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
// insertDF.printSchema()
// insertDF.show(10, truncate = false)
// The second step : Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.insert.shuffle.parallelism", 2)
//Hudi Property settings of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * use Snapshot Query Query table data in snapshot mode */
def queryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
// tripsDF.printSchema()
// tripsDF.show(10, truncate = false)
// The query cost is greater than 10, Less than 50 Ride data
tripsDF
.filter($"fare" >= 20 && $"fare" <=50)
.select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
.orderBy($"fare".desc, $"_hoodie_commit_time".desc)
.show(20, truncate = false)
}
def queryDataByTime(spark: SparkSession, path: String):Unit = {
import org.apache.spark.sql.functions._
// Mode one : Specified string , Filter and obtain data according to date and time
val df1 = spark.read
.format("hudi")
.option("as.of.instant", "20220610160908")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df1.printSchema()
df1.show(numRows = 5, truncate = false)
// Mode two : Specified string , Filter and obtain data according to date and time
val df2 = spark.read
.format("hudi")
.option("as.of.instant", "2022-06-10 16:09:08")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df2.printSchema()
df2.show(numRows = 5, truncate = false)
}
/** * take DataGenerator Pass in the generated data as a parameter */
def insertData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
// insertDF.printSchema()
// insertDF.show(10, truncate = false)
// The second step : Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
// Replace with Overwrite Pattern
.mode(SaveMode.Overwrite)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.insert.shuffle.parallelism", 2)
//Hudi Property settings of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * Simulation produces Hudi Update data in table , Update it to Hudi In the table */
def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator):Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
// Generate updated data
val updates = convertToStringList(dataGen.generateUpdates(100))
import scala.collection.JavaConverters._
val updateDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(updates.asScala, 2).toDS()
)
// TOOD: The first 2 Step 、 Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
updateDF.write
// Append mode
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi Set the attribute value of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * use Incremental Query Query data incrementally , You need to specify a timestamp */
def incrementalQueryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 load Hudi Table data , obtain commit time Time , As an incremental query data threshold
import org.apache.hudi.DataSourceReadOptions._
spark.read
.format("hudi")
.load(path)
.createOrReplaceTempView("view_temp_hudi_trips")
val commits: Array[String] = spark
.sql(
""" |select | distinct(_hoodie_commit_time) as commitTime |from | view_temp_hudi_trips |order by | commitTime DESC |""".stripMargin
)
.map(row => row.getString(0))
.take(50)
val beginTime = commits(commits.length - 1) // commit time we are interested in
println(s"beginTime = ${
beginTime}")
// The first 2 Step 、 Set up Hudi data CommitTime Time threshold , Perform incremental data query
val tripsIncrementalDF = spark.read
.format("hudi")
// Set the query data mode to :incremental, Incremental read
.option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
// Set the start time when incrementally reading data
.option(BEGIN_INSTANTTIME.key(), beginTime)
.load(path)
// The first 3 Step 、 Register the incremental query data as a temporary view , The query cost is greater than 20 data
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark
.sql(
""" |select | `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts |from | hudi_trips_incremental |where | fare > 20.0 |""".stripMargin
)
.show(10, truncate = false)
}
/** * Delete Hudi Table data , According to the primary key uuid To delete , If it's a partition table , Specify partition path */
def deleteData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 load Hudi Table data , Get the number of entries
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Raw Count = ${
tripsDF.count()}")
// The first 2 Step 、 Simulate the data to be deleted , from Hudi Load data in , Get a few pieces of data , Convert to the data set to be deleted
val dataframe = tripsDF.limit(2).select($"uuid", $"partitionpath")
import org.apache.hudi.QuickstartUtils._
val dataGenerator = new DataGenerator()
val deletes = dataGenerator.generateDeletes(dataframe.collectAsList())
import scala.collection.JavaConverters._
val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))
// The first 3 Step 、 Save data to Hudi In the table , Set the operation type :DELETE
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
deleteDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Set the data operation type to delete, The default value is upsert
.option(OPERATION.key(), "delete")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
// The first 4 Step 、 Load again Hudi Table data , Count the number of entries , See if you can reduce 2 Data
val hudiDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Delete After Count = ${
hudiDF.count()}")
}
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hty")
// establish SparkSession Sample object , Set properties
val spark: SparkSession = {
SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// Set serialization method :Kryo
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
// Defining variables : The name of the table 、 Save the path
val tableName: String = "tbl_trips_cow"
val tablePath: String = "/hudi_warehouse/tbl_trips_cow"
// Build a data generator , Simulate and generate business data
import org.apache.hudi.QuickstartUtils._
// Task a : Analog data , Insert Hudi surface , use COW Pattern
//insertData(spark, tableName, tablePath)
// Task 2 : Snapshot query (Snapshot Query) data , use DSL The way
//queryData(spark, tablePath)
//queryDataByTime(spark, tablePath)
// Task three : to update (Update) data , The first 1 Step 、 Simulation generates data , The first 2 Step 、 Simulation generates data , For 1 Step data field value update ,
// The first 3 Step 、 Update data to Hudi In the table
val dataGen: DataGenerator = new DataGenerator()
//insertData(spark, tableName, tablePath, dataGen)
//updateData(spark, tableName, tablePath, dataGen)
// Task 4 : Incremental query (Incremental Query) data , use SQL The way
//incrementalQueryData(spark, tablePath)
// Task five : Delete (Delete) data
deleteData(spark, tableName,tablePath)
// End of application , close resource
spark.stop()
}
}
test
perform insertData(spark, tableName, tablePath) Method, and then query it in the way of snapshot query :
queryData(spark, tablePath)
Incremental query (Incremental Query) data :
incrementalQueryData(spark, tablePath)
Reference material
https://www.bilibili.com/video/BV1sb4y1n7hK?p=21&vd_source=e21134e00867aeadc3c6b37bb38b9eee
边栏推荐
- On a un nom en commun, maître XX.
- 精彩回顾|I/O Extended 2022 活动干货分享
- [graduation season | advanced technology Er] another graduation season, I change my career as soon as I graduate, from animal science to programmer. Programmers have something to say in 10 years
- [untitled] use of cmake
- Hudi 集成 Spark 数据分析示例(含代码流程与测试结果)
- [point cloud processing paper crazy reading classic version 13] - adaptive graph revolutionary neural networks
- With low code prospect, jnpf is flexible and easy to use, and uses intelligence to define a new office mode
- Data mining 2021-4-27 class notes
- LeetCode 515. Find the maximum value in each tree row
- 【Kotlin疑惑】在Kotlin类中重载一个算术运算符,并把该运算符声明为扩展函数会发生什么?
猜你喜欢
Go language - JSON processing
Solve POM in idea Comment top line problem in XML file
Crawler career from scratch (I): crawl the photos of my little sister ① (the website has been disabled)
Digital management medium + low code, jnpf opens a new engine for enterprise digital transformation
CSDN markdown editor help document
The "booster" of traditional office mode, Building OA office system, was so simple!
LeetCode 75. Color classification
Jenkins learning (I) -- Jenkins installation
Crawler career from scratch (II): crawl the photos of my little sister ② (the website has been disabled)
拯救剧荒,程序员最爱看的高分美剧TOP10
随机推荐
[point cloud processing paper crazy reading classic version 13] - adaptive graph revolutionary neural networks
LeetCode 515. Find the maximum value in each tree row
2022-2-14 learning the imitation Niuke project - send email
Use the interface colmap interface of openmvs to generate the pose file required by openmvs mvs
【点云处理之论文狂读经典版8】—— O-CNN: Octree-based Convolutional Neural Networks for 3D Shape Analysis
Bert install no package metadata was found for the 'sacraments' distribution
Numerical analysis notes (I): equation root
LeetCode 508. The most frequent subtree elements and
【点云处理之论文狂读前沿版9】—Advanced Feature Learning on Point Clouds using Multi-resolution Features and Learni
Data mining 2021-4-27 class notes
【点云处理之论文狂读前沿版10】—— MVTN: Multi-View Transformation Network for 3D Shape Recognition
Banner - Summary of closed group meeting
Save the drama shortage, programmers' favorite high-score American drama TOP10
Go language - Reflection
【点云处理之论文狂读经典版12】—— FoldingNet: Point Cloud Auto-encoder via Deep Grid Deformation
Redis learning (I)
传统企业数字化转型需要经过哪几个阶段?
Temper cattle ranking problem
Tag paste operator (#)
Sword finger offer II 029 Sorted circular linked list