当前位置:网站首页>Analysis of zorder sampling partition process in Hudi - "deepnova developer community"
Analysis of zorder sampling partition process in Hudi - "deepnova developer community"
2022-07-29 08:30:00 【Dipu Technology】
author : Wuwenchi
background
hudi In terms of data aggregation , Support use zorder Rearrange the data .
do zorder The main process of sorting is divided into three steps :
- For each user specified zorder Field , Generate corresponding z value .
- Put all the zorder Field generated z Value cross group value of bits , Generate the final z value .
- According to the final z value , Sort all the data .
In the first step above , Each field generates its own z There are two main ways of value :
- Direct value mapping . This method is simple to implement , Easy to understand , But there are also flaws :
Participate in generation z The field of value theoretically needs to be from 0 A positive integer at the beginning , So that we can generate good z curve , But the real data set is basically impossible , that zorder The effect will be discounted .
For some data with the same prefix , for example url Most fields are in :http://www. start , Then it's meaningless to rank the first few .
- Sample the data first , Partition all data according to the sampling data , Finally, the partition number corresponding to the data is used as the z value . This way can solve the two problems of mode one very well :
Partition must be from 0 The starting integer .
For data with the same prefix , It can also be based on the string size , Allocate it well to different partitions , Get different z value .
The following mainly introduces the second way of sampling partition process .
( Code to hudi Of master Branch 、commit 22c45a7704cf4d5ec6fb56ee7cc1bf17d826315d Subject to )
Sampling partition process
Overall flow chart
( Overall flow chart of sampling partition , For some detailed calculation process analysis, please refer to the following code analysis )
The code analysis
- Get sampling results
// This function is mainly for rdd sampling , And return the sampling results ( The sampling result includes the sampled data and the corresponding weight of the data )
def getRangeBounds(): ArrayBuffer[(K, Float)] = {
// zEncodeNum: That is, the number of destination partitions , User configurable , Configuration items for :hoodie.layout.optimize.build.curve.sample.size
// The default number of partitions is 200000
if (zEncodeNum <= 1) {
ArrayBuffer.empty[(K, Float)]
} else {
// samplePointsPerPartitionHint: Number of samples per partition , The default is 20
// sampleSize: Number of samples required , At most 1e6 individual
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6)
// rdd.partitions:rdd The number of partitions
// It is assumed that the data of each partition is relatively balanced , And the amount of data in each partition is relatively large ,
// So collect more ( Because in fact, there will be partition imbalance , Although the partitions with a large amount of data will be re collected later , However, there may still be insufficient samples ),
// And calculate the average number of samples required for each partition
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// rdd.map(_._1): Is to obtain only those specified by the user zorder Field rdd
// sketch Start sampling each partition , In the next section ( Sample each partition ) Detailed instructions
// Return value :
// numItems: The rdd Total number of data
// sketched: Data collected by each partition
val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
ArrayBuffer.empty[(K, Float)]
} else {
// If the partition contains much more content than the average number of samples ( Number of records in a single partition *fraction > sampleSizePartition), We will re sample them
// To ensure that enough samples are collected from this partition .
// Calculation The proportion of the number of samples in the total number , Provide a basis for judging whether the partitions are balanced
val fraction: Double = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// Record all the data finally collected
// K: Data collected
// Float: The proportion corresponding to this data
val candidates = ArrayBuffer.empty[(K, Float)]
// Record data imbalance partition , Resample these partitions later
val imbalancedPartitions = mutable.Set.empty[Int]
// sketched :
// _1: Zone number
// _2: The total number of data in this partition
// _3: Data collected by this partition
sketched.foreach { case (idx, n, sample) =>
// Determine whether it is an unbalanced partition
// Here we can put fraction and sampleSizePerPartion Replace the calculation formula and simplify it ,
// Eventually it becomes :(rdd.partitions.length * n) / numItems > 3
// explain , If the data volume in the current partition is regarded as the average data volume , Larger than the actual total 3 times
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// Calculate the weight of the current sampling data = Total number of current partition data / Total number of current partition samples
// The weight calculated here , To determine the partition boundary later
val weight = (n.toDouble / sample.length).toFloat
// Put this data into the final result set
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Resample unbalanced partitions , At the same time, recalculate the weight of the sampling data
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
// Resample weights
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
// Calculate weight
val weight = (1.0 / fraction).toFloat
// Put this data into the final result set
candidates ++= reSampled.map(x => (x, weight))
}
// Return the sampling result set
candidates
}
}
}
1.1 Sample each partition
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// Sample each partition
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
// Prepare random seeds
val seed = byteswap32(idx ^ (shift << 16))
// Use the reservoir sampling method to sample the data in this partition
// sample: The data collected by this partition
// n: The total amount of data in this partition
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
// idx: Current partition number
// n: The total number of input data in the current partition
// sample: Current partition sampling set
Iterator((idx, n, sample))
}.collect()
// numItems Is the total number of input data in all partitions , At present rdd Total data , In order to calculate the weight of sampling data later
val numItems = sketched.map(_._2).sum
// Return results
// numItems: At present rdd Total data , In order to calculate the weight of sampling data later
// sketched: Data collected by each partition
(numItems, sketched)
1.2 Reservoir sampling method
This algorithm mainly deals with : Given a data stream , Data stream length N It's big , And N It's not known until all the data has been processed , How to traverse data only once (O(N)) Under the circumstances , Be able to pick out at random k Data .
The algorithm is simple , It is mainly divided into two steps :
Before using source data k Data , Initialize the result set .
Traverse source data k Later data , Calculate a random value , If the random value is less than k, Then replace the data in the result set .
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
// input: Input data set , namely rdd A partition in
// k: The number of data to be collected
// seed: Given seed
// Result set , That is, the sampled data
val reservoir = new Array[T](k)
var i = 0
// Before using data k Data , Initialize the result set ,
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
if (i < k) {
// Insufficient data , Then return directly
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// Enough data , Then start to do random replacement
// l: Record the total amount of data
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
l += 1
// Random replacement of the data in the result set
// If the random data is within the range of the result set , Then replace the data
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
// Return the collection results and total data
(reservoir, l)
}
}
Get partition boundary
def determineBound[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int, ordering: Ordering[K]): Array[K] = {// candidates: Data collected
// K: Currently sampled data
// Float: Weight of current sampling data
// partitions: The number of partitions you need ( That is, the required number of partitions configured by the user )
// ordering: Sort// take candidate Sort by the first field , In fact, there is only one field here
val ordered = candidates.sortBy(_._1)(ordering)// Number of samples
val numCandidates = ordered.size// Calculate the total weight
val sumWeights = ordered.map(_._2.toDouble).sum// Average weight of each partition
val step = sumWeights / partitionsvar cumWeight = 0.0
var target = step// Record the boundary result set
val bounds = ArrayBuffer.empty[K]var i = 0
var j = 0
var previousBound = Option.empty[K]// Traverse the sorted candidate, Accumulate its weight cumWeight, Whenever the weight reaches a partition
// Average weight step, Just get one key As the spacer of the partition , Finally, return all obtained delimiters
// notes : When calculating the boundary , Will skip duplicate data
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)// Accumulate the weight of the current partition
cumWeight += weight
if (cumWeight >= target) {// Skip duplicate data if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { // Add a partition bounds += key target += step j += 1 previousBound = Some(key) }}
i += 1
}// Return to the boundary
bounds.toArray
}Expand partition boundaries
// sampleBounds: That is, the partition boundary returned above
// because zorder You can specify multiple fields , Each field will define a boundary , Here, calculate the maximum value of the boundary length determined by all fields
val maxLength = sampleBounds.map(_.length).max// Traverse each partition boundary , If the length of partition boundary is very small , Then expand the boundary of the partition
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>val fillFactor: Int = maxLength / bound.size if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { // The current boundary length is too small , Then expand the current partition boundary val newBound = new Array[Double](bound.length * fillFactor) val longBound = bound.asInstanceOf[Array[Long]] for (i <- 0 to bound.length - 1) { for (j <- 0 to fillFactor - 1) { // Expanded size , Is the above fillFactor Reciprocal // for example fillFactor=3, Then the expanded size is 0.33, When a boundary value is 2 when , Will expand 2 Borders , by :2.33、2.66 and 3 newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) } } (newBound, fillFactor) } else { (bound, 0) }}
边栏推荐
- Eggjs create application knowledge points
- PostgreSQL manually creates hikaridatasource to solve the error cannot commit when autocommit is enabled
- Osg3.6.5 failed to compile freetype
- Charging pile charging technology new energy charging pile development
- Security baseline of network security
- Osgsimplegl3 combined with renderdoc tool
- Use SQL client How can the job generated by SH achieve breakpoint continuation after cancle?
- QT learning: use non TS files such as json/xml to realize multilingual internationalization
- Reading papers on false news detection (4): a novel self-learning semi supervised deep learning network to detect fake news on
- Markdown concise grammar manual
猜你喜欢

Solve the problem of MSVC2017 compiler with yellow exclamation mark in kits component of QT

Virtual augmentation and reality Part 2 (I'm a Firebird)

Day6: using PHP to write landing pages

Cv520 domestic replacement of ci521 13.56MHz contactless reader chip

Day5: PHP simple syntax and usage

Four pin OLED display based on stm32

Third week weekly report resnet+resnext

Inclination sensor accuracy calibration test

Qpalette learning notes

ROS tutorial (Xavier)
随机推荐
BiSeNet v2
Hal learning notes - Advanced timer of 7 timer
Nrf52832-qfaa Bluetooth wireless chip
【Transformer】SegFormer:Simple and Efficient Design for Semantic Segmentation with Transformers
【OpenCV】-算子(Sobel、Canny、Laplacian)学习
To create a thread pool for the rate, start the core thread
The first week of postgraduate freshman training: deep learning and pytorch Foundation
Hc-sr04 use method and routine of ultrasonic ranging module (STM32)
Flask reports an error runtimeerror: the session is unavailable because no secret key was set
7.2-function-overloading
ML.NET相关资源整理
Node: file write data (readfile, WriteFile), two modes: overwrite and increment
110道 MySQL面试题及答案 (持续更新)
Implementation of support vector machine with ml11 sklearn
Cv520 domestic replacement of ci521 13.56MHz contactless reader chip
为了速率创建线程池,启动核心线程
Thrift installation manual
Background management system platform of new energy charging pile
Simulation of four way responder based on 51 single chip microcomputer
DC motor speed regulation system based on 51 single chip microcomputer (use of L298)