当前位置:网站首页>spark学习(二)--作业调度和shuffle解析

spark学习(二)--作业调度和shuffle解析

2020-11-09 13:13:00 IT迷途小书童

(一)任务(作业)调度

任务调度相关的3个概念:jobstagetask

  1. Job:根据用的的spark逻辑任务。以action方法为界,遇到一个action 方法,则触发一个job
  2. Stagestagejob的子集。以宽依赖(shuffle)为界。遇到遇到一个shuffle,做一次划分。
  3. Tasktaskstage的子集。以并行度(分区数)为区分。分区数多少个,就多少个task

 

Spark的任务调度。整体而言分两路进行。Stage级的调度和task级的调度。整理描述如下。

1. 调度

Driver初始化sparkContext的过程中,会初始化DAG SchedulerTask SchedulerScheduler BackEndHeatBeatReceiver 并启动SchedulerBackEndheatBeatReceiver

 

SchedulerBackEnd负责通过AMRM申请资源,并不断的从Task SchedulerTask。分发到executor 执行。

 

heatBeatReceiver负责接收Executor 的心跳。监控executor的生存状态并通知Task Scheduler

 

(1) Spark通过transformation 方法形成RDD的血缘图。即构建了DAGstage的有向无环图)。

(2) DAG Scheduler 负责调度DAG中的stage。将每个stage打包成task Set集合。交给Task Scheduler负责Task的调度。

(3) Task Scheduler DAG Scheduler 传递过来的Task set 按照调度策略,分发给 在driver注册的 executor 上执行。

2. Stage 级调度

DAG 调用方法栈

 

 

 

 

(1) 创建DAGSparkContext Job交给DAG Scheduler执行。DAG SchedulerJob切分成stage。形成stage的有向无环图DAG

 

    切分依据:宽依赖。最终RDD根据回溯判断父RDD是否有shuffle。窄依赖的RDD划到同一个stage中,宽依赖的根据宽依赖,分成两个不同的stage

 

Stage 分类以及区别:分为两种。Result Stage shuffle Stage

Shuffle Stage 的完成标志是计算结果写磁盘完成。

Result Stage的完成意味者着一个job的完成。

 

(2) 执行DAGDAG Scheduler 根据提交策略提交stage。将需要提交的stage打包成task set 。调用Task Scheduler执行Task Set

 

Stage提交策略:一个stage是否提交,判断父stage是否执行。只有父stage都执行完毕后才执行当前stage。如果一个stage没有父stage,则从当前stage开始执行。

 

3. task级调度

Task的调度方法栈

   

 

 

 

 

Scheduler Back End 功能 负责Task的具体执行。他管理executor的资源。Task执行的时候,需要调用scheduler Back End ,获取资源。并分发任务执行。

 

Task调度策略:

  1. FIFO:先进先出。默认的调度策略。
  2. FIAR:(公平调度策略)

 

(1) 封装Task MangerTask SchedulerDAG Scheduler 传递的Task Set封装成Task Manger。加入到调度队列中。

(2) 调度Task MangerTask Manger调用schedulerBackEnd,获取活跃的executor ,将任务分发给executor 进行执行。

 

(二)SHUFFLE解析

(1) Shuffle 核心要点
  1. Shuffle 任务的个数

Shuffle 分为map阶段和reduce阶段。或者称之为 shuffle read阶段和shuffle write阶段。从spark UI上可以清楚看到。

  1. Map端的分区数是原文件本来的分区数。即split的个数。

 

(2) Hash shuffle
A. 未经优化的hash Shuffle

假设一次shuffle 阶段有MMapNreduce组成。本次实例为4nap3reduce Shuffle read阶段。每个map会产生N个文件。Shuffle就会产生M*N个文件。

Shuffle write阶段。Nreduce会去每个map函数中获取自己对应的文件。加载到内存进行reduce

 

 

 

 

 

B. 优化后的hash Shuffle

假设一次shuffle 阶段有MMapNreduce组成。本次实例为4nap3reduce。当前假设的是每个executor为单核。同一个executor分配两个map task

 

优化后的hash shuffle 原理是:不管一个executor 分配到多少个map函数。并不是每个map产生N个文件。而是同一个core 处理的一批maptask生成N个文件。所以。一个executor产生的文件数是。(单个executorcore数)*N 。也是当前执行mapexecutorcore*N

 

比较:有原来的M*N 变化成了core*N。通常而言。Core是比较确定的数量,不会太大。相比M会比较大。

 

优化后的hash shuffle原理图如下。

 

 

 

 

(3) Sort shuffle

现在的shuffleManager默认是 sort shuffleshuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制

A. 普通机制

 

 

 

 

普通机制的前部分和未经优化的hash shuffle类型。先放缓存。溢写的时候,根据key分配到不同的reduce对应的缓存中。在进入缓存的过程中先排序。各自缓存满的时候溢写到对应的磁盘文件为M*N个中间文件。

最终完成的时候,每个map对应N个文件进行merge。因为Nreduce都需要访问当前文件。所以需要对应一个索引文件,指明那个reduce在当前文件中的数据的位置【start offset end offset

B. ByPass机制

 

 

 

 

ByPass 模式和普通sort shuffle 不同点是:map处理之后,直接生成N个缓存,达到阈值后溢写N个磁盘。中间缺少一个共同的缓存。也缺少sort 排序。

版权声明
本文为[IT迷途小书童]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/maopneo/p/13948047.html