当前位置:网站首页>Using Hudi in idea
Using Hudi in idea
2022-07-03 09:21:00 【Did Xiao Hu get stronger today】
Environmental preparation
- establish Maven project
- Create a server remote connection
Tools------Delployment-----Browse Remote Host
Set the following :
Enter the account and password of the server here
Click on Test Connection, Tips Successfully Words , The configuration is successful . - Copy Hadoop Of core-site.xml、hdfs-site.xml as well as log4j.properties Three files copied to resources Under the folder .
Set up log4j.properties For printing warning exception information :
log4j.rootCategory=WARN, console
- add to pom.xml file
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<!-- rely on Scala Language -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core rely on -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL rely on -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client rely on -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hudi-spark3 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven Compiled plug-ins -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
To comment out the following code generated when creating the project , Otherwise, the dependency keeps reporting errors :
<!-- <properties>-->
<!-- <maven.compiler.source>8</maven.compiler.source>-->
<!-- <maven.compiler.target>8</maven.compiler.target>-->
<!-- </properties>-->
The code structure :
Core code
import org.apache.hudi.QuickstartUtils.DataGenerator
import org.apache.spark.sql.{
DataFrame, SaveMode, SparkSession}
/** * Hudi Framework of data Lake , be based on Spark Calculation engine , On data CURD operation , Use the taxi travel data generated by the official simulation * * Task a : Analog data , Insert Hudi surface , use COW Pattern * Task 2 : Snapshot query (Snapshot Query) data , use DSL The way * Task three : to update (Update) data * Task 4 : Incremental query (Incremental Query) data , use SQL The way * Task five : Delete (Delete) data */
object HudiSparkDemo {
/** * Official case : Simulation generates data , Insert Hudi surface , The type of the table is COW */
def insertData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
val dataGen: DataGenerator = new DataGenerator()
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
// insertDF.printSchema()
// insertDF.show(10, truncate = false)
// The second step : Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.insert.shuffle.parallelism", 2)
//Hudi Property settings of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * use Snapshot Query Query table data in snapshot mode */
def queryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
// tripsDF.printSchema()
// tripsDF.show(10, truncate = false)
// The query cost is greater than 10, Less than 50 Ride data
tripsDF
.filter($"fare" >= 20 && $"fare" <=50)
.select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
.orderBy($"fare".desc, $"_hoodie_commit_time".desc)
.show(20, truncate = false)
}
def queryDataByTime(spark: SparkSession, path: String):Unit = {
import org.apache.spark.sql.functions._
// Mode one : Specified string , Filter and obtain data according to date and time
val df1 = spark.read
.format("hudi")
.option("as.of.instant", "20220610160908")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df1.printSchema()
df1.show(numRows = 5, truncate = false)
// Mode two : Specified string , Filter and obtain data according to date and time
val df2 = spark.read
.format("hudi")
.option("as.of.instant", "2022-06-10 16:09:08")
.load(path)
.sort(col("_hoodie_commit_time").desc)
df2.printSchema()
df2.show(numRows = 5, truncate = false)
}
/** * take DataGenerator Pass in the generated data as a parameter */
def insertData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
val inserts = convertToStringList(dataGen.generateInserts(100))
import scala.collection.JavaConverters._
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
// insertDF.printSchema()
// insertDF.show(10, truncate = false)
// The second step : Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
// Replace with Overwrite Pattern
.mode(SaveMode.Overwrite)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.insert.shuffle.parallelism", 2)
//Hudi Property settings of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * Simulation produces Hudi Update data in table , Update it to Hudi In the table */
def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator):Unit = {
import spark.implicits._
// The first 1 Step 、 Simulated ride data
import org.apache.hudi.QuickstartUtils._
// Generate updated data
val updates = convertToStringList(dataGen.generateUpdates(100))
import scala.collection.JavaConverters._
val updateDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(updates.asScala, 2).toDS()
)
// TOOD: The first 2 Step 、 Insert data into Hudi surface
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
updateDF.write
// Append mode
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi Set the attribute value of the table
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
}
/** * use Incremental Query Query data incrementally , You need to specify a timestamp */
def incrementalQueryData(spark: SparkSession, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 load Hudi Table data , obtain commit time Time , As an incremental query data threshold
import org.apache.hudi.DataSourceReadOptions._
spark.read
.format("hudi")
.load(path)
.createOrReplaceTempView("view_temp_hudi_trips")
val commits: Array[String] = spark
.sql(
""" |select | distinct(_hoodie_commit_time) as commitTime |from | view_temp_hudi_trips |order by | commitTime DESC |""".stripMargin
)
.map(row => row.getString(0))
.take(50)
val beginTime = commits(commits.length - 1) // commit time we are interested in
println(s"beginTime = ${
beginTime}")
// The first 2 Step 、 Set up Hudi data CommitTime Time threshold , Perform incremental data query
val tripsIncrementalDF = spark.read
.format("hudi")
// Set the query data mode to :incremental, Incremental read
.option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL)
// Set the start time when incrementally reading data
.option(BEGIN_INSTANTTIME.key(), beginTime)
.load(path)
// The first 3 Step 、 Register the incremental query data as a temporary view , The query cost is greater than 20 data
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark
.sql(
""" |select | `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts |from | hudi_trips_incremental |where | fare > 20.0 |""".stripMargin
)
.show(10, truncate = false)
}
/** * Delete Hudi Table data , According to the primary key uuid To delete , If it's a partition table , Specify partition path */
def deleteData(spark: SparkSession, table: String, path: String): Unit = {
import spark.implicits._
// The first 1 Step 、 load Hudi Table data , Get the number of entries
val tripsDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Raw Count = ${
tripsDF.count()}")
// The first 2 Step 、 Simulate the data to be deleted , from Hudi Load data in , Get a few pieces of data , Convert to the data set to be deleted
val dataframe = tripsDF.limit(2).select($"uuid", $"partitionpath")
import org.apache.hudi.QuickstartUtils._
val dataGenerator = new DataGenerator()
val deletes = dataGenerator.generateDeletes(dataframe.collectAsList())
import scala.collection.JavaConverters._
val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))
// The first 3 Step 、 Save data to Hudi In the table , Set the operation type :DELETE
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
deleteDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Set the data operation type to delete, The default value is upsert
.option(OPERATION.key(), "delete")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), table)
.save(path)
// The first 4 Step 、 Load again Hudi Table data , Count the number of entries , See if you can reduce 2 Data
val hudiDF: DataFrame = spark.read.format("hudi").load(path)
println(s"Delete After Count = ${
hudiDF.count()}")
}
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hty")
// establish SparkSession Sample object , Set properties
val spark: SparkSession = {
SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// Set serialization method :Kryo
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
// Defining variables : The name of the table 、 Save the path
val tableName: String = "tbl_trips_cow"
val tablePath: String = "/hudi_warehouse/tbl_trips_cow"
// Build a data generator , Simulate and generate business data
import org.apache.hudi.QuickstartUtils._
// Task a : Analog data , Insert Hudi surface , use COW Pattern
//insertData(spark, tableName, tablePath)
// Task 2 : Snapshot query (Snapshot Query) data , use DSL The way
//queryData(spark, tablePath)
//queryDataByTime(spark, tablePath)
// Task three : to update (Update) data , The first 1 Step 、 Simulation generates data , The first 2 Step 、 Simulation generates data , For 1 Step data field value update ,
// The first 3 Step 、 Update data to Hudi In the table
val dataGen: DataGenerator = new DataGenerator()
//insertData(spark, tableName, tablePath, dataGen)
//updateData(spark, tableName, tablePath, dataGen)
// Task 4 : Incremental query (Incremental Query) data , use SQL The way
//incrementalQueryData(spark, tablePath)
// Task five : Delete (Delete) data
deleteData(spark, tableName,tablePath)
// End of application , close resource
spark.stop()
}
}
test
perform insertData(spark, tableName, tablePath) Method, and then query it in the way of snapshot query :
queryData(spark, tablePath)
Incremental query (Incremental Query) data :
incrementalQueryData(spark, tablePath)
Reference material
https://www.bilibili.com/video/BV1sb4y1n7hK?p=21&vd_source=e21134e00867aeadc3c6b37bb38b9eee
边栏推荐
- LeetCode 438. Find all letter ectopic words in the string
- Notes on numerical analysis (II): numerical solution of linear equations
- The "booster" of traditional office mode, Building OA office system, was so simple!
- 【点云处理之论文狂读前沿版13】—— GAPNet: Graph Attention based Point Neural Network for Exploiting Local Feature
- Severity code description the project file line prohibits the display of status error c2440 "initialization": unable to convert from "const char [31]" to "char *"
- 低代码前景可期,JNPF灵活易用,用智能定义新型办公模式
- AcWing 787. Merge sort (template)
- MySQL installation and configuration (command line version)
- Methods of using arrays as function parameters in shell
- 【点云处理之论文狂读经典版9】—— Pointwise Convolutional Neural Networks
猜你喜欢
剑指 Offer II 029. 排序的循环链表
How to check whether the disk is in guid format (GPT) or MBR format? Judge whether UEFI mode starts or legacy mode starts?
我們有個共同的名字,XX工
Beego learning - Tencent cloud upload pictures
LeetCode 515. Find the maximum value in each tree row
AcWing 787. Merge sort (template)
LeetCode 1089. 复写零
MySQL installation and configuration (command line version)
拯救剧荒,程序员最爱看的高分美剧TOP10
[kotlin learning] classes, objects and interfaces - define class inheritance structure
随机推荐
What are the stages of traditional enterprise digital transformation?
2022-2-14 learning the imitation Niuke project - send email
State compression DP acwing 91 Shortest Hamilton path
Methods of using arrays as function parameters in shell
NPM install installation dependency package error reporting solution
What is an excellent fast development framework like?
LeetCode 515. Find the maximum value in each tree row
Computing level network notes
【Kotlin学习】高阶函数的控制流——lambda的返回语句和匿名函数
Introduction to the usage of getopts in shell
Instant messaging IM is the countercurrent of the progress of the times? See what jnpf says
精彩回顾|I/O Extended 2022 活动干货分享
LeetCode 871. Minimum refueling times
Go language - IO project
Beego learning - Tencent cloud upload pictures
IDEA 中使用 Hudi
Severity code description the project file line prohibits the display of status error c2440 "initialization": unable to convert from "const char [31]" to "char *"
【点云处理之论文狂读经典版12】—— FoldingNet: Point Cloud Auto-encoder via Deep Grid Deformation
[point cloud processing paper crazy reading classic version 14] - dynamic graph CNN for learning on point clouds
[point cloud processing paper crazy reading classic version 10] - pointcnn: revolution on x-transformed points