当前位置:网站首页>Analysis of zorder sampling partition process in Hudi - "deepnova developer community"
Analysis of zorder sampling partition process in Hudi - "deepnova developer community"
2022-07-29 08:30:00 【Dipu Technology】
author : Wuwenchi
background
hudi In terms of data aggregation , Support use zorder Rearrange the data .
do zorder The main process of sorting is divided into three steps :
- For each user specified zorder Field , Generate corresponding z value .
- Put all the zorder Field generated z Value cross group value of bits , Generate the final z value .
- According to the final z value , Sort all the data .
In the first step above , Each field generates its own z There are two main ways of value :
- Direct value mapping . This method is simple to implement , Easy to understand , But there are also flaws :
Participate in generation z The field of value theoretically needs to be from 0 A positive integer at the beginning , So that we can generate good z curve , But the real data set is basically impossible , that zorder The effect will be discounted .
For some data with the same prefix , for example url Most fields are in :http://www. start , Then it's meaningless to rank the first few .
- Sample the data first , Partition all data according to the sampling data , Finally, the partition number corresponding to the data is used as the z value . This way can solve the two problems of mode one very well :
Partition must be from 0 The starting integer .
For data with the same prefix , It can also be based on the string size , Allocate it well to different partitions , Get different z value .
The following mainly introduces the second way of sampling partition process .
( Code to hudi Of master Branch 、commit 22c45a7704cf4d5ec6fb56ee7cc1bf17d826315d Subject to )
Sampling partition process
Overall flow chart
( Overall flow chart of sampling partition , For some detailed calculation process analysis, please refer to the following code analysis )
The code analysis
- Get sampling results
// This function is mainly for rdd sampling , And return the sampling results ( The sampling result includes the sampled data and the corresponding weight of the data )
def getRangeBounds(): ArrayBuffer[(K, Float)] = {
// zEncodeNum: That is, the number of destination partitions , User configurable , Configuration items for :hoodie.layout.optimize.build.curve.sample.size
// The default number of partitions is 200000
if (zEncodeNum <= 1) {
ArrayBuffer.empty[(K, Float)]
} else {
// samplePointsPerPartitionHint: Number of samples per partition , The default is 20
// sampleSize: Number of samples required , At most 1e6 individual
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6)
// rdd.partitions:rdd The number of partitions
// It is assumed that the data of each partition is relatively balanced , And the amount of data in each partition is relatively large ,
// So collect more ( Because in fact, there will be partition imbalance , Although the partitions with a large amount of data will be re collected later , However, there may still be insufficient samples ),
// And calculate the average number of samples required for each partition
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// rdd.map(_._1): Is to obtain only those specified by the user zorder Field rdd
// sketch Start sampling each partition , In the next section ( Sample each partition ) Detailed instructions
// Return value :
// numItems: The rdd Total number of data
// sketched: Data collected by each partition
val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
ArrayBuffer.empty[(K, Float)]
} else {
// If the partition contains much more content than the average number of samples ( Number of records in a single partition *fraction > sampleSizePartition), We will re sample them
// To ensure that enough samples are collected from this partition .
// Calculation The proportion of the number of samples in the total number , Provide a basis for judging whether the partitions are balanced
val fraction: Double = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// Record all the data finally collected
// K: Data collected
// Float: The proportion corresponding to this data
val candidates = ArrayBuffer.empty[(K, Float)]
// Record data imbalance partition , Resample these partitions later
val imbalancedPartitions = mutable.Set.empty[Int]
// sketched :
// _1: Zone number
// _2: The total number of data in this partition
// _3: Data collected by this partition
sketched.foreach { case (idx, n, sample) =>
// Determine whether it is an unbalanced partition
// Here we can put fraction and sampleSizePerPartion Replace the calculation formula and simplify it ,
// Eventually it becomes :(rdd.partitions.length * n) / numItems > 3
// explain , If the data volume in the current partition is regarded as the average data volume , Larger than the actual total 3 times
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// Calculate the weight of the current sampling data = Total number of current partition data / Total number of current partition samples
// The weight calculated here , To determine the partition boundary later
val weight = (n.toDouble / sample.length).toFloat
// Put this data into the final result set
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Resample unbalanced partitions , At the same time, recalculate the weight of the sampling data
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
// Resample weights
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
// Calculate weight
val weight = (1.0 / fraction).toFloat
// Put this data into the final result set
candidates ++= reSampled.map(x => (x, weight))
}
// Return the sampling result set
candidates
}
}
}
1.1 Sample each partition
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// Sample each partition
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
// Prepare random seeds
val seed = byteswap32(idx ^ (shift << 16))
// Use the reservoir sampling method to sample the data in this partition
// sample: The data collected by this partition
// n: The total amount of data in this partition
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
// idx: Current partition number
// n: The total number of input data in the current partition
// sample: Current partition sampling set
Iterator((idx, n, sample))
}.collect()
// numItems Is the total number of input data in all partitions , At present rdd Total data , In order to calculate the weight of sampling data later
val numItems = sketched.map(_._2).sum
// Return results
// numItems: At present rdd Total data , In order to calculate the weight of sampling data later
// sketched: Data collected by each partition
(numItems, sketched)
1.2 Reservoir sampling method
This algorithm mainly deals with : Given a data stream , Data stream length N It's big , And N It's not known until all the data has been processed , How to traverse data only once (O(N)) Under the circumstances , Be able to pick out at random k Data .
The algorithm is simple , It is mainly divided into two steps :
Before using source data k Data , Initialize the result set .
Traverse source data k Later data , Calculate a random value , If the random value is less than k, Then replace the data in the result set .
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
// input: Input data set , namely rdd A partition in
// k: The number of data to be collected
// seed: Given seed
// Result set , That is, the sampled data
val reservoir = new Array[T](k)
var i = 0
// Before using data k Data , Initialize the result set ,
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
if (i < k) {
// Insufficient data , Then return directly
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// Enough data , Then start to do random replacement
// l: Record the total amount of data
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
l += 1
// Random replacement of the data in the result set
// If the random data is within the range of the result set , Then replace the data
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
// Return the collection results and total data
(reservoir, l)
}
}
Get partition boundary
def determineBound[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int, ordering: Ordering[K]): Array[K] = {// candidates: Data collected
// K: Currently sampled data
// Float: Weight of current sampling data
// partitions: The number of partitions you need ( That is, the required number of partitions configured by the user )
// ordering: Sort// take candidate Sort by the first field , In fact, there is only one field here
val ordered = candidates.sortBy(_._1)(ordering)// Number of samples
val numCandidates = ordered.size// Calculate the total weight
val sumWeights = ordered.map(_._2.toDouble).sum// Average weight of each partition
val step = sumWeights / partitionsvar cumWeight = 0.0
var target = step// Record the boundary result set
val bounds = ArrayBuffer.empty[K]var i = 0
var j = 0
var previousBound = Option.empty[K]// Traverse the sorted candidate, Accumulate its weight cumWeight, Whenever the weight reaches a partition
// Average weight step, Just get one key As the spacer of the partition , Finally, return all obtained delimiters
// notes : When calculating the boundary , Will skip duplicate data
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)// Accumulate the weight of the current partition
cumWeight += weight
if (cumWeight >= target) {// Skip duplicate data if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { // Add a partition bounds += key target += step j += 1 previousBound = Some(key) }}
i += 1
}// Return to the boundary
bounds.toArray
}Expand partition boundaries
// sampleBounds: That is, the partition boundary returned above
// because zorder You can specify multiple fields , Each field will define a boundary , Here, calculate the maximum value of the boundary length determined by all fields
val maxLength = sampleBounds.map(_.length).max// Traverse each partition boundary , If the length of partition boundary is very small , Then expand the boundary of the partition
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>val fillFactor: Int = maxLength / bound.size if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { // The current boundary length is too small , Then expand the current partition boundary val newBound = new Array[Double](bound.length * fillFactor) val longBound = bound.asInstanceOf[Array[Long]] for (i <- 0 to bound.length - 1) { for (j <- 0 to fillFactor - 1) { // Expanded size , Is the above fillFactor Reciprocal // for example fillFactor=3, Then the expanded size is 0.33, When a boundary value is 2 when , Will expand 2 Borders , by :2.33、2.66 and 3 newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) } } (newBound, fillFactor) } else { (bound, 0) }}
边栏推荐
- ROS tutorial (Xavier)
- Proteus simulation based on msp430f2491
- C language function output I love you
- 6.3 references
- Centos7/8 command line installation Oracle11g
- Unity shader learning (VI) achieving radar scanning effect
- Vs2019 compilation cryengine failure problem handling
- The computer video pauses and resumes, and the sound suddenly becomes louder
- 数仓分层设计及数据同步问题,,220728,,,,
- Day4: SQL server is easy to use
猜你喜欢

Background management system platform of new energy charging pile

RPC和REST

Smart energy management system solution

Inclination monitoring solution of Internet of things

ROS common instructions

BiSeNet v2

C language sorts n integers with pointers pointing to pointers

Reading papers on false news detection (4): a novel self-learning semi supervised deep learning network to detect fake news on

Solve the problem of MSVC2017 compiler with yellow exclamation mark in kits component of QT

Third week weekly report resnet+resnext
随机推荐
Solve the problem of MSVC2017 compiler with yellow exclamation mark in kits component of QT
Third week weekly report resnet+resnext
110 MySQL interview questions and answers (continuously updated)
Privacy is more secure in the era of digital RMB
Low cost 2.4GHz wireless transceiver chip -- ci24r1
Osgsimplegl3 example analysis
Implementation of support vector machine with ml11 sklearn
Detailed steps of installing MySQL 5.7 for windows
RPC and rest
Normal visualization
leetcode hot 100(刷题篇9)(301/45/517/407/offer62/MST08.14/)
pnpm install出现:ERR_PNPM_PEER_DEP_ISSUES Unmet peer dependencies
2.4G band wireless transceiver chip si24r1 summary answer
6.2 function-parameters
Day4: the establishment of MySQL database and its simplicity and practicality
Preparation of SQL judgment statement
Product promotion channels and strategies, cosmetics brand promotion methods and steps
Compatible with cc1101/cmt2300-dp4301 sub-1g wireless transceiver chip
PostgreSQL manually creates hikaridatasource to solve the error cannot commit when autocommit is enabled
MySQL中的时间函数