当前位置:网站首页>Spark stage and shuffle for daily data processing
Spark stage and shuffle for daily data processing
2022-06-24 07:27:00 【Something new】
Spark Stage, DAG(Directed Acyclic Graph)
- Spark Divide Stage Is based on the Job Generated
DAG, In discrete mathematics we have learned a Directed acyclic graph (Directed Acyclic Graph) The concept of , In a reproduction environment , The task I wrote was just Directed trees (Directed tree) Level , Directed acyclic graphs have not been encountered yet . But you can imagine , If used in the code RDD OfjoinOperators are possible Directed acyclic graph Of DAG. For the log data processing used by our group , The main focus is Directed tree complexity Logical topology .
PS: A directed tree must be Directed acyclic graph , Directed acyclic graphs are not always directed trees . You can mend your brain by yourself Abstracting a process into a topology can better add various optimization measures to it , Not like it Hadoop MapReduce Generally, the results of each step are written back to , Cause a lot of waste .
- This is the case in our business scenario , The original collected logs , Cut out small fields , And in order , This operation I call normalization . And perform a series of operations on normalized data .
real_data.map(deal_data_func).reduceByKey(merge_data_func).foreachRDD(store_data_func)
- stay
store_data_funcin UseforeachPartitionConnect with the storage medium . stay Spark in , This method is calledaction
RDD Methods
- RDD There are two types of methods
transformationandaction, If and only ifactionWhen called ,SparkWill actually submit the task toDAG Scheduler, And then assigned toTask Scheduler
If you are writing Spark Project time , Just did it transformation But did not submit action, Now Spark Would do nothing!
real_data.map(deal_data_func).reduceByKey(merge_data_func)This kind of writing is unusual Spark There is nothing unexpected in the project , It can even be considered complete . This is the MapReduce One of the biggest differences , because MapReduce It doesn't matter Stage Divide , Many people read the old code on the Internet , Start new Spark I fell into this misunderstanding .
- The reason Spark Need to submit
actionThen the calculation is actually performed , To make the most of itDAGDivideStageAdvantages , Including but not limited to Reduce computation ,I/O load etc. - In many
transformationIn operation , Mentioned in the previous article , They are divided into two categories : Wide dependence (reduceByKey, ...), Narrow dependence (map,flatMap, ...)- The latter is much simpler than the former , Just for everyone
PartitionDo a mapping for each data in the ,PartitionThe number does not change - The former is a little more complicated , Because in this type of operation , Our goal is to obtain an extraction of global data ( For the same key Of value Add up ), But when the amount of data is too large to be fully accommodated on one machine , We just need Spark To schedule and segment data and reallocate
PartitionAnd the data . - Wide dependence Generated new RDD Of
PartitionNumber is the biggest puzzle and black box for beginners ( Including me ), One day I finally couldn't help , Check the source code , WithreduceByKeyAs an example : # reduceByKey There are three types of function signatures , Be clear at a glance 1.def reduceByKey( partitioner: Partitioner, func: JFunction2[V, V, V] ) 2.def reduceByKey( func: JFunction2[V, V, V], numPartitions: Int ) 3.def reduceByKey( func: JFunction2[V, V, V] ) - What we use most is The shortest , Overloaded mode with only one parameter
3 2a3One more parameternumPartitions, This parameter means that we can specifyPartitionnumber , So many people say on the InternetPartitionbySparkSelf generated with Certain misleading , But this function only works if you knowSparkOnly when using the dispatching principle .1This is the focus of this time , The first parameter isPartitionerVariable of type , We can guess , If we use3when , Neither specifynumPartitionsNor does it specify onePartitionerThere must be onedefaultThings that are , Used to determine thereduceByKeyAfterPartitionNumber- Keep going through the source code , stay
3We see in the function implementation ofdefaultPartitionerInstantiation , And called1: fromRDD(reduceByKey(defaultPartitioner(rdd), func)) # Signature , It can be seen that , The instance must pass in at least one rdd parametric def defaultPartitioner( rdd: RDD[_], others: RDD[_]* )
- The latter is much simpler than the former , Just for everyone
defaultPartitioneryes Spark A built-in implementation , It implements a section of settings new RDDPartitionLogic- If there is more than one used RDD Pass in , And it's not set
spark.default.parallelismParameters be new RDD OfPartitionThe number of used RDD The largest of - If you set
spark.default.parallelismParameters , be new RDD OfPartitionThe number is the value of this parameter . val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) { rdd.context.defaultParallelism } else { rdds.map(_.partitions.length).max }
- If there is more than one used RDD Pass in , And it's not set
spark.default.parallelism Parameter in Spark Set when the project is initialized , Save in SparkContext in , Used to Spark People are not strangers , In general, this value is set to
Excutor * Excutor-core * 2Now I understandPartitionThe calculation of quantity comes from , See the more detailed source code operation , You can read Spark Core Medium Partitioner.scala file , Very concise .
- RDD Medium
PartitionNumber is important , The reason is that it determines to a large extent Spark Of Concurrent effect , The last article mentioned , RDD OfPartition· It's different from where you areStageMediumtaskOne-to-one correspondence , This is the same thingspark.default.parallelismThe origin of the parameter name .
stay Spark Of Patch For in the Partition The choice of number has always been a hot topic , If you are interested, you can take a look at this Patch(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6377) , But so far Spark-2.3.2, It is still my above conclusion But actually Spark SQL There has been a dynamic adjustment Partition Number of function codes , 1spark.sql.adaptive.enable=true1
Stage Division
- actually
StageThe division of should be best understood , Or you don't need to delve into source level understanding , In practice , What we need to pay attention to most , Is when it will happenShuffle, andStageThe division of is to find outShuffleWhere it should happen ShuffleThe occurrence of means , Data may migrate between different nodes , Write from memory to file , A series of lossy operations in which memory reads the contents of a file ,90% The above scenario requires lessShuffleThe better .- Through difference
transformationTo achieve this goal , The most classic usereduceByKeyReplacegroupByKeyI won't go into that , The principle is The former will aggregate the local data first , And then transfer to other nodes , ReduceShuffle Stagestay Spark In essence, it is a series of things that can parallel ExecutivetaskSet , DivideStageThe standard is Wide dependence Appearance . Take the example at the beginning of the article as the prototype
- Through difference
- As you can see from the diagram , When executed
reduceByKeywhen ,ShuffleAnd it started , If your Spark It's a set of functions many Cluster of nodes- First, it will be done locally
reduceByKey, Get a local only<key,value> - Then
Shuffle-Write( Such as Write Disk), fromDAGSchedulerSelect which data to allocate to which node (defaultpartitionerdecision ) - Then at the destination node
Shuffle-Read( Such as Read Network)Actively pull data - And then finally merge , At this point, for any... On any node
keyIt's the only one in the world
- First, it will be done locally
- As can be seen from the above , Want to reduce
ShuffleConsumption of , Except for the reductionShuffleThe number of times . And try to reduce every timeShuffleThe size of the amount of data .
stay
Shufflelater , Our project scenarios generally need to store calculation results , The storage of calculation results determines whether this batch of tasks can be truly completed to a certain extent , It can be roughly divided into In place storage and Centralized storage , It will be detailed in the next chapter .
[Extra]Shuffle Read&Write Small touch
This part of the details , In fact, it is not very helpful for the application in the actual project , Just to get to know Spark Within , All you need to know is , Shuffle Bring all kinds of IO Unavoidable ,Spark Various optimization algorithms are being added , To reduce the cost of this part . You need to set the scene here , We use the default storage medium ,
Shuffle WriteIs to write data to the local disk .
- When a piece of data goes through various normalization , Last call Narrow dependence
transformationwhen , Still take the above example as the background . - The first thing that happens is
Shuffle Write,Spark We will confirm this time first Comparator (Partitioner), As can be seen from the above , There are two functions of a partition :- Identify new RDD The number of partitions
- Decide which data is placed in which partitions
- When Spark Determined the number of partitions
- First, it will use the internal Algorithm Do the local data first
reduceByKey - Then create a new temporary file locally , This will be based on various situations ( for example Partition Number , Serialization, etc ) Choose different Shuffle Write Algorithm , Write out intermediate results to disk .
- according to
PartitionerDecide whatkeyWhich partition does the data belong to , And sorted by partition number in memory , When out of memory , Write to disk , And bring the index file , To identify different partition data ( This file is arranged in order ). - Finally, when the other end is ready to pull data , Then merge the data of the same partition distributed in different files , To the other end .
- First, it will use the internal Algorithm Do the local data first
- In the figure ,
1SituatedTaskAnd used RDD OfPartitionOne-to-one correspondence , stay3Stage do a merge .4StageTaskRepresents the remote endShuffle ReadOfTask, The quantity is the same as new RDD OfPartitionSame and one-to-one correspondence .
There are too many details here , because Shuffle Write There are many algorithms , Spark Choose which algorithm to use to output files to reduce performance loss according to the situation . The situation mentioned above is also one of them
SortShufflenothing more
边栏推荐
- How to turn on win11 notebook power saving mode? How to open win11 computer power saving mode
- [从零开始学习FPGA编程-42]:视野篇 - 后摩尔时代”芯片设计的技术演进-1-现状
- In JS, the regular expression verifies the hour and minute, and converts the input string to the corresponding hour and minute
- Tencent host security captures Yapi remote code execution 0day vulnerability for wild exploitation. The attack is spreading and can be intercepted by firewall
- App management platform app host
- OMX的初始化流程
- Smart space 𞓜 visualization of operation of digital twin cargo spacecraft
- [image fusion] image fusion based on directional discrete cosine transform and principal component analysis with matlab code
- Huawei cloud image engine service
- PIP install XXX on the terminal but no module named XXX on pycharm
猜你喜欢
![[OGeek2019]babyrop](/img/74/5f93dcee9ea5a562a7fba5c17aab76.png)
[OGeek2019]babyrop

【信号识别】基于深度学习CNN实现信号调制分类附matlab代码

与(&&)逻辑或(||),动态绑定结合三目运算

get_started_3dsctf_2016

Accessing user interface settings using systemparametersinfo

Huawei Cloud Database Advanced Learning

【图像融合】基于方向离散余弦变换和主成分分析的图像融合附matlab代码

What are the dazzling skills of spot gold?

Ultra wideband pulse positioning scheme, UWB precise positioning technology, wireless indoor positioning application

The fund management of London gold is more important than others
随机推荐
两个链表的第一个公共节点_链表中环的入口(剑指offer)
[从零开始学习FPGA编程-41]:视野篇 - 摩尔时代与摩尔定律以及后摩尔时代的到来
Tutorial on simple use of Modbus to BACnet gateway
How to open the soft keyboard in the computer, and how to open the soft keyboard in win10
前缀和专题训练
Cisco router configuration notes: static routing, rip, OSPF, principles combined with experiments, worth a visit!
Tencent host security captures Yapi remote code execution 0day vulnerability for wild exploitation. The attack is spreading and can be intercepted by firewall
Accelerate large-scale data analysis based on Apache iceberg through data organization optimization
Prefix and topic training
二分专题训练
PIP install XXX on the terminal but no module named XXX on pycharm
蓝牙耳机怎么连接电脑使用,win10电脑如何连接蓝牙耳机
Research on redis High Availability Mechanism
Fine! Storage knowledge is a must for network engineers!
[security] how to [host security - hybrid cloud version] support secure access to non Tencent virtual machines
What is an intrusion detection system?
[Proteus] Arduino uno + ds1307+lcd1602 time display
Huawei cloud image engine service
Stop looking! The most complete data analysis strategy of the whole network is here
OMX initialization process