当前位置:网站首页>The spark operator - coalesce operator
The spark operator - coalesce operator
2022-08-05 06:11:00 【zdaiqing】
完整代码
/** * 返回一个新的RDD,it is reduced tonumPartitions分区. * Return a new RDD that is reduced into `numPartitions` partitions. * * This leads to a narrow dependency,例如,如果你从1000个分区到100个分区,There will be no transfer, * 而是每100new partitions will be occupied10current partition.If the requested number of partitions is larger,The current score will be maintained * 区数量. * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * 然而,If you are doing a drastic merge,例如numPartitions = 1,This can cause your calculations to happen * on fewer nodes than you would like(例如,在numPartitions = 1In the case of only one node).为了避免 * 这种情况,你可以传递shuffle = true.这将增加一个shuffle步骤,But means the current upstream partition * 将并行执行(无论当前分区是什么). * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * 注意: * 使用shuffle = true,Can actually be merged into more partitions.If you have a small number of partitions(比如100个),并 * And there may be several partitions that are abnormally large,那么这很有用.调用coalesce(1000, shuffle = true)将产生 * 1000个分区,Data is distributed using a hash partitioner.The optional partition combiner passed in must be serializable. * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
shuffle
默认情况
默认shuffle为false,默认不进行shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
shuffle处理流程
shufflepost data distribution
- 从一个随机分区开始,在输出分区中均匀分布元素
- HashPartitioner:The number of data partitions = position%总的分区数
- index:Upstream partition index;numPartitions:当前shuffle后的rdd的分区数
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
shuffle逻辑
- new ShuffledRDD() 构建shuffle RDD
- 构建shuffle RDD时,调用distributePartitionThe function adjusts the partitions of the data distribution
- 将shuffle RDD转为 CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
非shuffle处理流程
- 直接将RDD转为CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(this, numPartitions, partitionCoalescer)
参考资料
边栏推荐
猜你喜欢

spark源码-任务提交流程之-1-sparkSubmit

IP地址及子网的划分

Account and Permission Management

Remembering my first CCF-A conference paper | After six rejections, my paper is finally accepted, yay!

错误类型:reflection.ReflectionException: Could not set property ‘xxx‘ of ‘class ‘xxx‘ with value ‘xxx‘

Configuration of TensorFlow ObjecDetectionAPI under Anaconda3 of win10 system

Hard Disk Partitioning and Permanent Mounting

账号与权限管理

Getting Started 11 Automatically add version numbers

2020年手机上最好的25种免费游戏
随机推荐
每日一题-二分法
入门文档04 一个任务依赖另外一个任务时,需要按顺序执行
spark算子-parallelize算子
硬盘分区和永久挂载
CIPU,对云计算产业有什么影响
Unity huatuo 革命性热更系列1.2 huatuo热更环境安装与示例项目
什么是阿里云·速成美站?
unity实现第一人称漫游(保姆级教程)
spark source code - task submission process - 2-YarnClusterApplication
Getting Started Document 09 Standalone watch
通过单总线调用ds18b20的问题
lvm逻辑卷及磁盘配额
腾讯云云函数SCF—入门须知
【Day1】(超详细步骤)构建软RAID磁盘阵列
UE4美术你有必要了解的数学基础
Blender软件介绍与使用心得
图片压缩失效问题
Unity常用模块设计 : Unity游戏排行榜的制作与优化
[Day1] VMware software installation
Spark源码-任务提交流程之-6.1-sparkContext初始化-创建spark driver端执行环境SparkEnv