当前位置:网站首页>Spark - understand parquet
Spark - understand parquet
2022-06-26 03:59:00 【BIT_ six hundred and sixty-six】
One . quote
parquet Files are common in Spark、Hive、Streamin、MapReduce And other big data scenarios , The efficient data storage and retrieval are realized through column storage and metadata storage , So let's focus on parquet The file in spark Storage in scenarios , Read and use the pit that may be encountered .

Two .Parquet Loading mode
1.SparkSession.read.parquet
SparkSession be located org.apache.spark.sql.SparkSession Under the class , In addition to supporting reading parquet Out of the column file ,SparkSession Also supports reading ORC Column storage file , You can refer to : Spark Read ORC FIle
val conf = new SparkConf()
.setAppName("ParquetInfo")
.setMaster("local")
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
spark.read.parquet(path).foreach(row => {
val head = row.getString(0)
println(head)
})After reading, a Sql.DataFrame, Support common sql Syntax operation , If you don't want to use it sql It can also be done through .rdd The way to get RDD[Row], Then traverse each partition Under the Iterator[Row] that will do .
Tips:
Can be executed later sql operation , Of course, initialization is also supported SqlContext call sql Method , But use SparkSession It can be done .
val parquetFileDF = spark.read.parquet("path")
parquetFileDF.createOrReplaceTempView("tableName")
val resultDf = spark.sql("SELECT * FROM tableName")
val sqlContext = new SQLContext(sc)
sqkContext.sql("xxx")2.SparkContext.HadoopFile
Use hadoopFile When reading, you need to specify the corresponding K-V as well as InputFormat The format of ,Parquet File corresponding K-V by Void-ArrayWritable, Its InputFormat by : org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, obtain ArrayWritable After that, you can get Writable.
val sc = spark.sparkContext
sc.setLogLevel("error")
val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])
parquetInfo.take(5).foreach(info => {
val writable = info._2.get()
val head = writable(0)
println(writable.length + "\t" + head)
})Tips:
Need to be in SparkConf Add serialized configuration to , otherwise hadoopFile The method will report an error :

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")writable The specific content can only be obtained through deserialization , So it's recommended to use SparkSession The official api Read , But you can RcFile SparkSession Direct reading is not supported for the time being , So it can be used sc.hadoopRdd Method to read the data stored in the same column RcFile Format file , You can refer to : Spark Read RcFile
3、 ... and .Parquet storage
1. Static conversion
Parquet -> Parquet, Read parquet Generate Sql.DataFrame Redeposit , similar RDD Of transform:
spark.read.parquet(path)
.write.mode(SaveMode.Overwrite)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.format("parquet")
.save("/split")2.RDD[T*] transformation
General data RDD You can do this by joining import sqlContext.implicits._ The way of implicit conversion is determined by RDD Convert to sql.Dataframe, Then complete parquet The storage , Here is a cover up PairRDD Convert to df And stored methods :
import sqlContext.implicits._
val commonStringRdd = sc.emptyRDD[(String, String)].toDF()
commonStringRdd.write
.mode(SaveMode.Overwrite)
.format("parquet")
.save("")Tips:
SaveModel It is divided into Append Additional 、Overwrite Cover 、ErrorIfExists Report errors 、Ignore Ignore the four modes , The first two are easy to understand , The latter two former means that an error will be reported if the address already exists , The latter ignores and does not affect the original data if the address already exists .SaveModel By enumeration Enum The way to achieve :

The details of the RDD transformation Sql.DataFrame You can refer to :Spark - RDD / ROW / sql.DataFrame Interturn .
3.RDD[Row] transformation
If there are generated RDD[Row] You can call it directly sqlContext Will be RDD Convert to DataFrame. here TABLE_SCHEMA It can be regarded as a description of each column of data , similar Hive Of column Information about , Mainly the field name and type , You can also add additional information ,sqlContext Match the corresponding column attributes with Row One by one , If Schema The length does not reach Row Total number of columns for , The subsequent fields can only be read as Null.
val sqlContext = new SQLContext(sc)
final val TABLE_SCHEME = StructType(Array(
StructField("A", StringType),
StructField("B", StringType),
StructField("C", StringType),
StructField("D", StringType),
StructField("E", StringType),
StructField("F", StringType),
StructField("G", StringType),
StructField("H", StringType)
))
val commonRowRdd = sc.emptyRDD[Row]
sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME)
.write.mode(SaveMode.Overwrite)
.format("parquet")
.save("/split")Tips:

Errors may be reported when using the above syntax : Illegal pattern component: XXX , This is because of the internal DataFormat The problem of resolution , Add... To the code .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") that will do .
spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)Four .Parquet elementary analysis
Parquet Because it's open source , Support multi platform and multi system and efficient storage and coding scheme , This makes it very suitable for task development in big data scenarios , Let's take a brief look at two of his characteristics , Columnar storage and metadata storage :
1. The column type storage - Smaller IO
CSV Is the most common line storage , For some scenarios that require separate features or columns , If it is CSV The file needs to traverse the entire line and split , Finally get the target element , and Parquet Mode through column storage , For individual features, you can directly access , Thus, the efficiency of execution is improved , Reduced data IO.
CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)2. Metadata Store - Higher compression ratio
Parquet Adopt multiple codes encoding The way , Ensure efficient data storage and low space
A.Run Length encoding
Run length encoding , When multiple columns of data in a row have a lot of duplicate data , Can pass "X repeated N Time " How to record , Reduce the cost of recording , although N It could be very big , But the cost of storage is small :
[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1
B.Dictionary encoding
Dictionary code , As the name suggests, it is through mapping , Save data with too many duplicates , for example "0" -> "LongString":
[LongString, LongString, LongString] -> [0, 0, 0]
C.Delta encoding
Incremental encoding , Apply to unix Time stamp , The timestamp record is 1970 year 1 month 1 The number of seconds in the day , The initial timestamp can be directly subtracted when storing the timestamp , Reduce storage , such as 1577808000 As a benchmark , You can reduce a lot of storage space :
[1577808000, 1577808004, 1577808008] -> [0, 4, 8]
3. Storage - Compression comparison
val st = System.currentTimeMillis()
val pairInfo = (0 to 1000000).zipWithIndex.toArray
val format = "csv" // csv、json、parquet
sc.parallelize(pairInfo).toDF("A", "B")
.write
.mode(SaveMode.Overwrite)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.save(s"./output/$format")
val saveType = "gzip" // text、gzip
sc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])
val cost = System.currentTimeMillis() - st
println(s" Time consuming : $cost")Use the above two methods to separate 0 To 1000000 The array of is saved to the corresponding file , Look at the size of the storage :

| type | Text | Gzip | Parquet | CSV | JSON |
| size (MB) | 15.8 | 4.6 | 8 | 13.8 | 23.8 |
Compared to tabular data CSV and JSON Storage ,parquet Provides a higher compression ratio ,Amazon S3 Clusters have been compared CSV And parquet The efficiency of , Use Parquet You can cut 87% Size , The speed of inquiry is fast 34 times At the same time, it can save 99.7 Cost of , Therefore, in the scenario of adding large amounts of data and often requiring individual column operations ,Parquet Very suitable .
4. Read - Efficiency comparison
Then read the above files separately :
val csv = spark.read.csv(path + "/output/csv").rdd.count()
val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()
val json = spark.read.json(path + "/output/json").count()
val common = sc.textFile(path + "/output/common").count()
val gz = sc.textFile(path + "/output/gzip").count()| type | Text | Gzip | Parquet | CSV | JSON |
| Time consuming (ms) | 1417 | 1448 | 4952 | 6870 | 6766 |
comparison CSV,JSON There are advantages , But relative to the number of rows stored Text and Gzip, perform count The row statistics operation of class is obviously not the strength of column storage file , So there is a big difference , In case of big data, statistics on one or several fields ,Parquet It will provide higher performance than line storage files .
5.selectExpr

Read Parquet In addition to obtaining the original field contents in the file , It can also be done through selectExpr Action for more additional information , The method is located in org.apache.spark.sql.functions in , It contains collect_list Similar aggregation operations , Also contains count Similar statistical operations , also max、min、isnull wait .
spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {
println(row.getLong(0))
})The above operations are carried out through selectExpr Got count(_c1) Number of features ,count Result:5383.
among _c1 by Parquet Acquired sql.DataFrame Default schema, The default can be obtained by the following methods schema Information :
val schema = spark.read.parquet(path).schema
println(schema)
Here's part of it , The feature name is from _c0 Start accumulating in turn , The default is _c0,_c1 , If you define schema Of StructField , Use spark.read.schema().parqeut() Read it out sql.Dataframe Of selectExpr The column name used by the operation in the function should be changed to the name defined by itself , for example _c1 I define it as age, Then the above expression should be changed to count('age'), Reuse _c1 Will report a mistake . More detailed schema Operation can refer to :Parquet Appoint schema
5、 ... and . summary
Spark - Parquet That's what we usually use ,SparkSession Integrated reading parquet、orc Of API It's very convenient , If necessary, it is suggested to pass directly API Read instead of HadoopRdd / HadoopFile . Finally, I want to say parquet The naming of is really fun ,parquet Floor , Variable length column names are stored , If the plane display also has the feeling of floor .

边栏推荐
- ipvs之ipvs0网卡
- Quanergy welcomes Lori sundberg as chief human resources officer
- Getting started with flask
- Uni app swiper rotation chart (full screen / card)
- go time包:秒、毫秒、纳秒时间戳输出
- 2022.6.24-----leetcode. five hundred and fifteen
- 1.基础关
- 2022.6.23-----leetcode. thirty
- [LOJ 6718] nine suns' weakened version (cyclic convolution, arbitrary modulus NTT)
- Detailed explanation of widget construction process of fluent
猜你喜欢

chrome页面录制,重放功能

Matplotlib line chart, text display, win10

What should I do if the 51 SCM board cannot find the device in keil

C # knowledge structure

An error occurred using the connection to database 'on server' 10.28.253.2‘

使用SOAPUI访问对应的esb工程

ABP framework Practice Series (III) - domain layer in depth

【Flink】Flink Sort-Shuffle写流程简析

外包干了四年,人直接废了。。。

1.基础关
随机推荐
After four years of outsourcing, people are directly abandoned...
阿里云函数计算服务一键搭建Z-Blog个人博客
763. dividing alphabetic intervals
Analysis of the principle of obxwidget
. Net core learning journey
English version of ternary loss
asp.net网页选择身份进行登录的简单代码,asp连接数据库,使用asp:Panel、asp:DropDownList控件
Conditional variables for thread synchronization
XML parsing bean tool class
Part 4: drawing quadrilateral
Dix critères de base importants pour les essais de débogage de logiciels
2020 summary: industrial software development under Internet thinking
Matplotlib multi line chart, dot scatter chart
开源!ViTAE模型再刷世界第一:COCO人体姿态估计新模型取得最高精度81.1AP
[Flink] Flink batch mode map side data aggregation normalizedkeysorter
外包干了四年,人直接废了。。。
Detailed explanation of widget construction process of fluent
ABP framework
力扣 515. 在每个树行中找最大值
高性能算力中心 — RoCE — Overview