当前位置:网站首页>Spark based distributed parallel processing optimization strategy - Merrill Lynch data

Spark based distributed parallel processing optimization strategy - Merrill Lynch data

2022-06-26 09:19:00 Merrill Lynch data tempodata

because Spark In the use of JDBC When reading relational model data in , Single thread task execution is adopted by default . When the amount of data is large , Memory overflows are often found 、 The problem of low performance . Repartition after expanding memory read , It will consume time , Waste resources .
therefore , Develop and read relational model data concurrently , It can effectively improve the concurrency of task processing , Reduce the amount of data processing for a single task , And then improve the processing efficiency .

Distributed concurrent processing optimization

( One ) General idea
For concurrent reading of relational models, partition fields must be selected first , Determine the interval between concurrent partitions according to the field type and the number of partitions key value . hypothesis key Values can divide the model data evenly into multiple logical partitions , according to key Values constitute query conditions to read model data concurrently . The key points include :
1、 Rules for selecting partition fields
(a) Preliminarily determine the first character or numeric field in the model .
2、 Number of divisions
(a) Give the default number of partitions , After test reading and writing, follow 1000w The data volume gives the recommended resource allocation and the number of default partitions .
(b) Allow users to customize the configuration .
3、 Static partition policy
(a) Numerical type : Convert to characters in reverse order , The interval between concurrent partitions is determined according to the character range of value bits and the number of partitions key value , Carry out multi partition construction .
(b) Character : In reverse order, the interval between concurrent partitions is determined according to the single character value range and the number of partitions key value , Carry out multi partition construction .
( Two ) Overall processing flow
The overall processing flow is shown in the figure :
 Distributed concurrent processing

Rules for verifying and processing the number of partitions : The number of partitions must be within [1,range] Within the scope of , If the lower limit is exceeded, it will be treated as a partition , Exceed the upper limit according to the upper limit range Handle . The maximum number of partitions supported (range) The character type is 64 Of 4 Power , The numerical type is 10000.
( 3、 ... and ) Threshold range concurrent reads
The threshold range can be read concurrently, which is suitable for models with numeric partition fields .
The key parameters :
partitionColumn: Partition field name
lowerBound: Lower limit of value
upperBound: Upper value limit
numPartitions: Number of divisions
( Four ) Default concurrent read
The default concurrent read is for partitioned fields of character and numeric types , Obtain the filter condition of approximate average score according to the value range of the type , Allocate data to different logical partitions according to conditions , And use concurrent execution to improve data reading efficiency .
1、 The model is read concurrently
The model concurrent reading design adopts different interface calling methods according to the number of partitions .
► The number of partitions is 1

val url = "jdbc:mysql://host:3306/test"
val prop = new java.util.Properties
prop.setProperty("user", "***")
prop.setProperty("password", "***")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
val df = spark.sqlContext.read.jdbc(url,"tname",prop)

url For database connection string information .
tname Is the table name of the query , Query criteria are also supported , Form like :

(select * from ronghe_mysql_bigint_50wwhere cast(RY_YGGH as UNSIGNED) > 250000)tmp

prop For database connection information 、 user name 、 password 、driver Wait for configuration information .
► The number of partitions is greater than 1

val url = "jdbc:mysql://host:3306/test"
val prop = new java.util.Properties
prop.setProperty("user", "***")
prop.setProperty("password", "***")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
val df = spark.sqlContext.read.jdbc(url,"tname",predicates,prop)

The number of concurrent reads from multiple partitions is 1 The partition pre partition condition is added to the parameter of .
among ,predicates Pre partition conditions for partitions ,Array[String], Filter the data according to the content of each element when reading .
2、 Partition pre partition conditions
Partition pre partition condition is string data composed of multiple conditions .

val predicates = Array[String](
  " cols < '3'",
  " cols >= '3' and cols <'6'",
  " cols >= '6'
)

Partition pre partition conditions include partition condition columns and comparison values . The partition condition value consists of the selected partition fields and their operations , The comparison value is the static partition interval value . Consider the ordered numerical type 、 The character type is used in business scenarios. The high order is similar to the low order, and the difference is obvious , Therefore, the partition fields are processed in reverse order .
Assume that the partition field is splitCol.
splitCol When it's a numeric type : Partition condition column cols by reverse(cast(splitColas char)).
splitCol When it is a character type : Partition condition column cols by reverse(splitCol).
Suppose the partition interval value is splitKeys(Array[String]), The length is L. The contrast value is constructed in the way of left closing and right opening .
The first condition is cols < splitKeys(0);
The second condition is cols >= splitKeys(0) and cols < splitKeys(1);
The first i The first condition is cols >= splitKeys(i-2)and cols < splitKeys(i-1);
The last condition is cols >= splitKeys(L-1).
3、 Number of divisions
Model concurrent read design , The partition interval value is represented by four characters . that , The range of values that can be expressed is the fourth power of the number of values that can be taken for each .
Set each character to take 64 individual , The number of values a number can take 10, That is, the maximum number of partitions supported (range): Character (64 Of 4 Power )、 Numerical type (10000).
4、 Get the static partition interval value
Realize the idea
Find the partition interval value according to the character range of the field type , That is, to find the approximate average position point of the range represented by the interval value .
Assume that the partition interval value is represented by four characters .( set up N Zones )
The idea of looking for the interval value of numeric characters :
(1) The number value is [0,9]( The decimal point is not considered for the time being , Bitwise will be assigned less than 0 Corresponding partition ), Scope of representation :1, 2, 3,……,9998,9999.
(2) Find the size range of each partition S, Indicates the number of ranges divided by the number of partitions (10^4/(N-1)).
(3)S-1,2S-1,3S-1,……,(N-1)*S-1 That is, the interval value that can divide the four digits equally .
The idea of finding the interval value of character type ( Value range 64 Characters , optimization algorithm ):

(1) According to the degree of common use , Determine the value range of each character of the interval value as :Array(’.’, ‘0’, ‘1’, ‘2’, ‘3’,‘4’, ‘5’, ‘6’, ‘7’, ‘8’, ‘9’, ‘A’, ‘B’, ‘C’, ‘D’, ‘E’, ‘F’, ‘G’, ‘H’, ‘I’, ‘J’,‘K’, ‘L’, ‘M’, ‘N’, ‘O’, ‘P’, ‘Q’, ‘R’, ‘S’, ‘T’, ‘U’, ‘V’, ‘W’, ‘X’, ‘Y’, ‘Z’,‘a’, ‘b’, ‘c’, ‘d’, ‘e’, ‘f’, ‘g’, ‘h’, ‘i’, ‘j’, ‘k’, ‘l’, ‘m’, ‘n’, ‘o’, ‘p’,‘q’, ‘r’, ‘s’, ‘t’, ‘u’, ‘v’, ‘w’, ‘x’, ‘y’, ‘z’, ‘~’)
(2) Infrequently used characters will be grouped into the nearest partition , Chinese characters will be assigned to the last partition , Avoid exclusive use of a partition by infrequently used characters , To reduce the consumption of resources .
(3) Find the size range of each segment S, Indicates the number of ranges divided by the number of partitions (64^4/(N-1))
(4) The interval value between the same number characters , Interpret character spacing values as 64 Decimal digit ( Shift operation can be used to obtain ), that S-1,2S-1,3S-1,……,(N-1)*S-1 It is a number that approximately divides four characters equally , The string formed by the characters in each corresponding character interval value array is the interval value .
Spark Distributed parallel processing

Decimal to 64 Base number , Number in decimal keyInt For example ,tmp Is the result array after conversion :digitsNum Is the number of digits 4.

for (j <- 1 to digitsNum) {
    tmp(digitsNum - j) = charactors(keyInt & (charLength - 1))
    keyInt >>= 6
}

The idea of finding the interval value of character type ( Any character in the value range , General algorithms ):
It is consistent with the general search idea of character type character interval value , But it is not limited by the number of value ranges .
(1) According to the degree of common use , Determine the value range of each character of the interval value as Array(……), The number of elements is m.
(2) Infrequently used characters will be grouped into the nearest partition , Chinese characters will be assigned to the last partition , Avoid exclusive use of a partition by infrequently used characters , To reduce the consumption of resources .
(3) Find the size range of each segment S, Indicates the number of ranges divided by the number of partitions (m^4/(N-1)).
(4) The interval value between the same number characters , Interpret character spacing values as m Decimal digit , that S-1,2S-1,3S-1,……,(N-1)*S-1 It is a number that approximately divides four characters equally , The string corresponding to these numbers is the interval value of the average character range ( The character string formed by the characters in the character interval value array corresponding to each digit of the number is the interval value ).
Decimal to m Base number , Number in decimal keyInt For example ,tmp Is the result array after conversion :digitsNum Is the number of digits 4.

for (j <- 1 to digitsNum) {
    tmp(digitsNum - j) = charactors(keyInt % m)
    keyInt = math.floor(keyInt / m).toInt
}

test result

In the data asset platform , With 50 ten thousand 、1000 Million data for synchronous performance test , The test results are shown in the table below :
 Data asset platform

Summary and prospect

Reading data concurrently according to the partition field for processing can effectively improve the data processing capacity , But the value range of the partitioned field 、 The impact of data distribution , The effect is different , In the future, the partition policy will be continuously optimized , To meet the performance requirements of various business scenarios .

原网站

版权声明
本文为[Merrill Lynch data tempodata]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202170552315744.html