当前位置:网站首页>Spark 结构化流写入Hudi 实践
Spark 结构化流写入Hudi 实践
2022-07-03 09:00:00 【小胡今天有变强吗】
概述
整合Spark StructuredStreaming与Hudi,实时将流式数据写入Hudi表中,对每批次数据batch DataFrame,采用
Spark DataSource方式写入数据。
流程与前一篇博客https://blog.csdn.net/hshudoudou/article/details/125303310?spm=1001.2014.3001.5501的配置文件一致。
项目结构如下图所示:
主要是 stream 包下的两个 spark 代码。
代码
- MockOrderProducer.scala
模拟订单产生实时产生交易订单数据,使用Json4J转换数据为JSON字符,发送Kafka Topic中。
import java.util.Properties
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{
KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
/** * 订单实体类(Case Class) * * @param orderId 订单ID * @param userId 用户ID * @param orderTime 订单日期时间 * @param ip 下单IP地址 * @param orderMoney 订单金额 * @param orderStatus 订单状态 */
case class OrderRecord(
orderId: String,
userId: String,
orderTime: String,
ip: String,
orderMoney: Double,
orderStatus: Int
)
/** * 模拟生产订单数据,发送到Kafka Topic中 * Topic中每条数据Message类型为String,以JSON格式数据发送 * 数据转换: * 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库) */
object MockOrderProducer {
def main(args: Array[String]): Unit = {
var producer: KafkaProducer[String, String] = null
try {
// 1. Kafka Client Producer 配置信息
val props = new Properties()
props.put("bootstrap.servers", "hadoop102:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
// 2. 创建KafkaProducer对象,传入配置信息
producer = new KafkaProducer[String, String](props)
// 随机数实例对象
val random: Random = new Random()
// 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3
val allStatus = Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
while (true) {
// 每次循环 模拟产生的订单数目
val batchNumber: Int = random.nextInt(1) + 20
(1 to batchNumber).foreach {
number =>
val currentTime: Long = System.currentTimeMillis()
val orderId: String = s"${
getDate(currentTime)}%06d".format(number)
val userId: String = s"${
1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
val orderTime: String = getDate(currentTime, format = "yyyy-MM-dd HH:mm:ss.SSS")
val orderMoney: String = s"${
5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
// 3. 订单记录数据
val orderRecord: OrderRecord = OrderRecord(
orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
)
// 转换为JSON格式数据
val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
println(orderJson)
// 4. 构建ProducerRecord对象
val record = new ProducerRecord[String, String]("order-topic", orderId, orderJson)
// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
producer.send(record)
}
// Thread.sleep(random.nextInt(500) + 5000)
Thread.sleep(random.nextInt(500))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != producer) producer.close()
}
}
/** =================获取当前时间================= */
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
val formatDate: String = fastFormat.format(time) // 格式化日期
formatDate
}
/** ================= 获取随机IP地址 ================= */
def getRandomIp: String = {
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792, 608174079), //36.56.0.0-36.63.255.255
(1038614528, 1039007743), //61.232.0.0-61.237.255.255
(1783627776, 1784676351), //106.80.0.0-106.95.255.255
(2035023872, 2035154943), //121.76.0.0-121.77.255.255
(2078801920, 2079064063), //123.232.0.0-123.235.255.255
(-1950089216, -1948778497), //139.196.0.0-139.215.255.255
(-1425539072, -1425014785), //171.8.0.0-171.15.255.255
(-1236271104, -1235419137), //182.80.0.0-182.92.255.255
(-770113536, -768606209), //210.25.0.0-210.47.255.255
(-569376768, -564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
}
/** =================将Int类型IPv4地址转换为字符串类型================= */
def number2IpString(ip: Int): String = {
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".")
}
}
注意修改 Kafka Client Producer 中的服务器配置信息。
- HudiStructuredDemo.scala
Structured Streaming Application应用,实时从Kafka的【order-topic】消费JSON格式数据,经过ETL转换后,存储到Hudi表中。
import com.tianyi.hudi.didi.SparkUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{
DataFrame, Dataset, Row, SaveMode, SparkSession}
/** * 基于StructuredStreaming结构化流实时从Kafka消费数据,经过ETL转换后,存储至Hudi表 */
object HudiStructuredDemo {
/** * 指定Kafka Topic名称,实时消费数据 */
def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop102:9092")
.option("subscribe", topicName)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000)
.option("failOnDataLoss", "false")
.load()
}
def process(streamDF: DataFrame): DataFrame = {
streamDF
// 选择字段
.selectExpr(
"CAST(key AS STRING) order_id",
"CAST(value AS STRING) AS message",
"topic", "partition", "offset", "timestamp"
)
//解析Message数据,提取字段值
.withColumn("user_id", get_json_object(col("message"), "$.userId"))
.withColumn("order_time", get_json_object(col("message"), "$.orderTime"))
.withColumn("ip", get_json_object(col("message"), "$.ip"))
.withColumn("order_money", get_json_object(col("message"), "$.orderMoney"))
.withColumn("order_status", get_json_object(col("message"), "$.orderStatus"))
//删除message字段
.drop(col("message"))
// 转换订单日期时间格式为Long类型,作为hudi表中合并数据字段
.withColumn("ts", to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss.SSS"))
// 订单日期时间提取分区日志:yyyyMMdd
.withColumn("day", substring(col("order_time"), 0, 10))
}
/** * 将流式数据DataFrame保存到Hudi表中 */
def saveToHudi(streamDF: DataFrame) = {
streamDF.writeStream
.outputMode(OutputMode.Append())
.queryName("query-hudi-streaming")
.foreachBatch((batchDF: Dataset[Row], batchId: Long) => {
println(s"============== BatchId: ${
batchId} start ==============")
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
batchDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(RECORDKEY_FIELD.key(), "order_id")
.option(PRECOMBINE_FIELD.key(), "ts")
.option(PARTITIONPATH_FIELD.key(), "day")
.option(TBL_NAME.key(), "tbl_hudi_order")
.option(TABLE_TYPE.key(), "MERGE_ON_READ")
// 分区值对应目录格式,与Hive分区策略一致
.option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")
.save("/hudi-warehouse/tbl_hudi_order")
})
.option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001")
.start()
}
def main(args: Array[String]): Unit = {
//设置服务器用户名
System.setProperty("HADOOP_USER_NAME","hty")
// step1、构建SparkSession实例对象
val spark: SparkSession = SparkUtils.createSpakSession(this.getClass)
//step2、从Kafka实时消费数据
val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic")
// step3、提取数据,转换数据类型
val streamDF: DataFrame = process(kafkaStreamDF)
// step4、保存数据至Hudi表中:MOR类型表,读取表数据合并文件
saveToHudi(streamDF)
// step5、流式应用启动以后,等待终止
spark.streams.active.foreach(query => println(s"Query: ${
query.name} is Running .........."))
spark.streams.awaitAnyTermination()
}
}
注意修改代码中服务器的相关配置。
测试
- 模拟数据生成的测试
启动 MockOrderProducer.scala 代码,产生模拟的订单数据。 - 创建与代码中一致的 topic: order_topic
下图所示软件为kafka连接器Offset ,对于创建和管理Topic非常的方便,可以自行从网上下载并连接测试。
可以看到Kafka中已经接收到了数据。 - HudiStructuredDemo.scala测试,启动主方法,从 Kafka 的 order_topic 中消费数据,并以Hudi 表的形式存入HDFS 中。
检查HDFS下的存储情况,所消费的数据都已成功写入。
产生了许多parquet文件,这正是Hudi的存储形式。而parquet文件增长到一定的数量之后便不会继续再增长,而是维持在一个特定的范围,这是Hudi可以自行合并小文件机制的体现。
Spark Shell查询
- 启动Spark-shell
./spark-shell --master local[2] \
--jars /opt/module/jars/hudi-jars/hudi-spark3-bundle_2.12-0.9.0.jar,\
--jars /opt/module/jars/hudi-jars/spark-avro_2.12-3.0.1.jar,\
--jars /opt/module/jars/hudi-jars/spark_unused-1.0.0.jar \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
- 创建orderDF
val orderDF = spark.read.format("hudi").load("/hudi-warehouse/tbl_hudi_order")
- 查看表结构
orderDF.printSchema()
- 展示前10条数据
orderDF.show(10, false)
- 选择特定字段
orderDF.select("order_id", "user_id", "order_time", "ip", "order_money", "order_status", "day").show(false)
- 利用函数进行聚合统计
//注册成为临时视图
orderDF.createOrReplaceTempView("view_tmp_orders")
//执行SQL进行统计
spark.sql(""" with tmp AS ( select CAST(order_money AS DOUBLE) from view_tmp_orders where order_status = '0' ) select max(order_money) as max_money, min(order_money) as min_money, round(avg(order_money), 2) as avg_money from tmp """).show
至此整个流程结束。
总结
本示例通过spark程序产生模拟的订单交易数据,实时由Spark进行消费,再由 Spark 进行消费,写入到Hudi表中。
边栏推荐
- LeetCode 30. 串联所有单词的子串
- LeetCode 75. 颜色分类
- Vscode编辑器右键没有Open In Default Browser选项
- 数字化转型中,企业设备管理会出现什么问题?JNPF或将是“最优解”
- Go language - Reflection
- Sword finger offer II 029 Sorted circular linked list
- [kotlin learning] operator overloading and other conventions -- overloading the conventions of arithmetic operators, comparison operators, sets and intervals
- Use the interface colmap interface of openmvs to generate the pose file required by openmvs mvs
- 【点云处理之论文狂读前沿版8】—— Pointview-GCN: 3D Shape Classification With Multi-View Point Clouds
- 剑指 Offer II 029. 排序的循环链表
猜你喜欢
LeetCode 715. Range 模块
教育信息化步入2.0,看看JNPF如何帮助教师减负,提高效率?
AcWing 785. Quick sort (template)
IDEA 中使用 Hudi
Windows安装Redis详细步骤
Install third-party libraries such as Jieba under Anaconda pytorch
What is an excellent fast development framework like?
Basic knowledge of network security
On a un nom en commun, maître XX.
即时通讯IM,是时代进步的逆流?看看JNPF怎么说
随机推荐
2022-2-13 learn the imitation Niuke project - Project debugging skills
Problems in the implementation of lenet
Banner - Summary of closed group meeting
The "booster" of traditional office mode, Building OA office system, was so simple!
AcWing 785. Quick sort (template)
精彩回顾|I/O Extended 2022 活动干货分享
[point cloud processing paper crazy reading frontier version 11] - unsupervised point cloud pre training via occlusion completion
Overview of database system
LeetCode 508. The most frequent subtree elements and
[point cloud processing paper crazy reading frontier version 10] - mvtn: multi view transformation network for 3D shape recognition
2022-2-13 learning xiangniuke project - version control
Using variables in sed command
Recommend a low code open source project of yyds
Noip 2002 popularity group selection number
Shell script kills the process according to the port number
LeetCode 515. 在每个树行中找最大值
AcWing 787. Merge sort (template)
2022-2-14 learning xiangniuke project - Session Management
AcWing 788. 逆序对的数量
即时通讯IM,是时代进步的逆流?看看JNPF怎么说