当前位置:网站首页>Using Hudi in idea
Using Hudi in idea
2022-07-03 09:21:00 【Did Xiao Hu get stronger today】
Environmental preparation
- establish Maven project
- Create a server remote connection
Tools------Delployment-----Browse Remote Host
Set the following :
Enter the account and password of the server here
Click on Test Connection, Tips Successfully Words , The configuration is successful . - Copy Hadoop Of core-site.xml、hdfs-site.xml as well as log4j.properties Three files copied to resources Under the folder .
Set up log4j.properties For printing warning exception information :
log4j.rootCategory=WARN, console
- add to pom.xml file
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<!-- rely on Scala Language -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core rely on -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL rely on -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client rely on -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hudi-spark3 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven Compiled plug-ins -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
To comment out the following code generated when creating the project , Otherwise, the dependency keeps reporting errors :
<!-- <properties>-->
<!-- <maven.compiler.source>8</maven.compiler.source>-->
<!-- <maven.compiler.target>8</maven.compiler.target>-->
<!-- </properties>-->
The code structure :
Core code
import org.apache.hudi.QuickstartUtils.DataGenerator
import org.apache.spark.sql.{
DataFrame, SaveMode, SparkSession}
/** * Hudi Framework of data Lake , be based on Spark Calculation engine , On data CURD operation , Use the taxi travel data generated by the official simulation * * Task a : Analog data , Insert Hudi surface , use COW Pattern * Task 2 : Snapshot query (Snapshot Query) data , use DSL The way * Task three : to update (Update) data * Task 4 : Incremental query (Incremental Query) data , use SQL The way * Task five : Delete (Delete) data */
object HudiSparkDemo {
/** * Official case : Simulation generates data , Insert Hudi surface , The type of the table is COW */
def insertData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
val dataGen: DataGenerator = new DataGenerator()
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
// insertDF.printSchema()
// insertDF.show(10, truncate = false)
// The second step : Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.insert.shuffle.parallelism", 2)
//Hudi Property settings of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * use Snapshot Query Query table data in snapshot mode */
def queryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
// tripsDF.printSchema()
// tripsDF.show(10, truncate = false)
// The query cost is greater than 10, Less than 50 Ride data
tripsDF
.filter($"fare" >= 20 && $"fare" <=50)
.select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
.orderBy($"fare".desc, $"_hoodie_commit_time".desc)
.show(20, truncate = false)
}
def queryDataByTime(spark: SparkSession, path: String):Unit = {
import org.apache.spark.sql.functions._
// Mode one : Specified string , Filter and obtain data according to date and time
val df1 = spark.read
.format("hudi")
.option("as.of.instant", "20220610160908")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df1.printSchema()
df1.show(numRows = 5, truncate = false)
// Mode two : Specified string , Filter and obtain data according to date and time
val df2 = spark.read
.format("hudi")
.option("as.of.instant", "2022-06-10 16:09:08")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df2.printSchema()
df2.show(numRows = 5, truncate = false)
}
/** * take DataGenerator Pass in the generated data as a parameter */
def insertData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
// insertDF.printSchema()
// insertDF.show(10, truncate = false)
// The second step : Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
// Replace with Overwrite Pattern
.mode(SaveMode.Overwrite)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.insert.shuffle.parallelism", 2)
//Hudi Property settings of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * Simulation produces Hudi Update data in table , Update it to Hudi In the table */
def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator):Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
// Generate updated data
val updates = convertToStringList(dataGen.generateUpdates(100))
import scala.collection.JavaConverters._
val updateDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(updates.asScala, 2).toDS()
)
// TOOD: The first 2 Step 、 Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
updateDF.write
// Append mode
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi Set the attribute value of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * use Incremental Query Query data incrementally , You need to specify a timestamp */
def incrementalQueryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 load Hudi Table data , obtain commit time Time , As an incremental query data threshold
import org.apache.hudi.DataSourceReadOptions._
spark.read
.format("hudi")
.load(path)
.createOrReplaceTempView("view_temp_hudi_trips")
val commits: Array[String] = spark
.sql(
""" |select | distinct(_hoodie_commit_time) as commitTime |from | view_temp_hudi_trips |order by | commitTime DESC |""".stripMargin
)
.map(row => row.getString(0))
.take(50)
val beginTime = commits(commits.length - 1) // commit time we are interested in
println(s"beginTime = ${
beginTime}")
// The first 2 Step 、 Set up Hudi data CommitTime Time threshold , Perform incremental data query
val tripsIncrementalDF = spark.read
.format("hudi")
// Set the query data mode to :incremental, Incremental read
.option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
// Set the start time when incrementally reading data
.option(BEGIN_INSTANTTIME.key(), beginTime)
.load(path)
// The first 3 Step 、 Register the incremental query data as a temporary view , The query cost is greater than 20 data
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark
.sql(
""" |select | `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts |from | hudi_trips_incremental |where | fare > 20.0 |""".stripMargin
)
.show(10, truncate = false)
}
/** * Delete Hudi Table data , According to the primary key uuid To delete , If it's a partition table , Specify partition path */
def deleteData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 load Hudi Table data , Get the number of entries
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Raw Count = ${
tripsDF.count()}")
// The first 2 Step 、 Simulate the data to be deleted , from Hudi Load data in , Get a few pieces of data , Convert to the data set to be deleted
val dataframe = tripsDF.limit(2).select($"uuid", $"partitionpath")
import org.apache.hudi.QuickstartUtils._
val dataGenerator = new DataGenerator()
val deletes = dataGenerator.generateDeletes(dataframe.collectAsList())
import scala.collection.JavaConverters._
val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))
// The first 3 Step 、 Save data to Hudi In the table , Set the operation type :DELETE
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
deleteDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Set the data operation type to delete, The default value is upsert
.option(OPERATION.key(), "delete")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
// The first 4 Step 、 Load again Hudi Table data , Count the number of entries , See if you can reduce 2 Data
val hudiDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Delete After Count = ${
hudiDF.count()}")
}
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hty")
// establish SparkSession Sample object , Set properties
val spark: SparkSession = {
SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// Set serialization method :Kryo
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
// Defining variables : The name of the table 、 Save the path
val tableName: String = "tbl_trips_cow"
val tablePath: String = "/hudi_warehouse/tbl_trips_cow"
// Build a data generator , Simulate and generate business data
import org.apache.hudi.QuickstartUtils._
// Task a : Analog data , Insert Hudi surface , use COW Pattern
//insertData(spark, tableName, tablePath)
// Task 2 : Snapshot query (Snapshot Query) data , use DSL The way
//queryData(spark, tablePath)
//queryDataByTime(spark, tablePath)
// Task three : to update (Update) data , The first 1 Step 、 Simulation generates data , The first 2 Step 、 Simulation generates data , For 1 Step data field value update ,
// The first 3 Step 、 Update data to Hudi In the table
val dataGen: DataGenerator = new DataGenerator()
//insertData(spark, tableName, tablePath, dataGen)
//updateData(spark, tableName, tablePath, dataGen)
// Task 4 : Incremental query (Incremental Query) data , use SQL The way
//incrementalQueryData(spark, tablePath)
// Task five : Delete (Delete) data
deleteData(spark, tableName,tablePath)
// End of application , close resource
spark.stop()
}
}
test
perform insertData(spark, tableName, tablePath) Method, and then query it in the way of snapshot query :
queryData(spark, tablePath)
Incremental query (Incremental Query) data :
incrementalQueryData(spark, tablePath)
Reference material
https://www.bilibili.com/video/BV1sb4y1n7hK?p=21&vd_source=e21134e00867aeadc3c6b37bb38b9eee
边栏推荐
- Go language - IO project
- With low code prospect, jnpf is flexible and easy to use, and uses intelligence to define a new office mode
- STM32F103 can learning record
- Windows安装Redis详细步骤
- LeetCode 57. Insert interval
- Jenkins learning (II) -- setting up Chinese
- Sword finger offer II 091 Paint the house
- Education informatization has stepped into 2.0. How can jnpf help teachers reduce their burden and improve efficiency?
- Temper cattle ranking problem
- 【点云处理之论文狂读前沿版12】—— Adaptive Graph Convolution for Point Cloud Analysis
猜你喜欢
[point cloud processing paper crazy reading classic version 10] - pointcnn: revolution on x-transformed points
剑指 Offer II 029. 排序的循环链表
【Kotlin学习】高阶函数的控制流——lambda的返回语句和匿名函数
LeetCode 508. The most frequent subtree elements and
CSDN markdown editor help document
【点云处理之论文狂读经典版12】—— FoldingNet: Point Cloud Auto-encoder via Deep Grid Deformation
npm install安装依赖包报错解决方法
[set theory] order relation (eight special elements in partial order relation | ① maximum element | ② minimum element | ③ maximum element | ④ minimum element | ⑤ upper bound | ⑥ lower bound | ⑦ minimu
LeetCode 513. 找树左下角的值
LeetCode 75. Color classification
随机推荐
Derivation of Fourier transform
LeetCode 532. K-diff number pairs in array
Wonderful review | i/o extended 2022 activity dry goods sharing
Noip 2002 popularity group selection number
Linxu learning (4) -- Yum and apt commands
【点云处理之论文狂读经典版7】—— Dynamic Edge-Conditioned Filters in Convolutional Neural Networks on Graphs
Common penetration test range
Redis learning (I)
低代码前景可期,JNPF灵活易用,用智能定义新型办公模式
The less successful implementation and lessons of RESNET
Digital statistics DP acwing 338 Counting problem
[set theory] order relation (eight special elements in partial order relation | ① maximum element | ② minimum element | ③ maximum element | ④ minimum element | ⑤ upper bound | ⑥ lower bound | ⑦ minimu
Banner - Summary of closed group meeting
【点云处理之论文狂读经典版14】—— Dynamic Graph CNN for Learning on Point Clouds
图像修复方法研究综述----论文笔记
【Kotlin疑惑】在Kotlin类中重载一个算术运算符,并把该运算符声明为扩展函数会发生什么?
Crawler career from scratch (II): crawl the photos of my little sister ② (the website has been disabled)
拯救剧荒,程序员最爱看的高分美剧TOP10
【毕业季|进击的技术er】又到一年毕业季,一毕业就转行,从动物科学到程序员,10年程序员有话说
2022-2-14 learning xiangniuke project - generate verification code