当前位置:网站首页>The spark operator - coalesce operator
The spark operator - coalesce operator
2022-08-05 06:11:00 【zdaiqing】
完整代码
/** * 返回一个新的RDD,it is reduced tonumPartitions分区. * Return a new RDD that is reduced into `numPartitions` partitions. * * This leads to a narrow dependency,例如,如果你从1000个分区到100个分区,There will be no transfer, * 而是每100new partitions will be occupied10current partition.If the requested number of partitions is larger,The current score will be maintained * 区数量. * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * 然而,If you are doing a drastic merge,例如numPartitions = 1,This can cause your calculations to happen * on fewer nodes than you would like(例如,在numPartitions = 1In the case of only one node).为了避免 * 这种情况,你可以传递shuffle = true.这将增加一个shuffle步骤,But means the current upstream partition * 将并行执行(无论当前分区是什么). * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * 注意: * 使用shuffle = true,Can actually be merged into more partitions.If you have a small number of partitions(比如100个),并 * And there may be several partitions that are abnormally large,那么这很有用.调用coalesce(1000, shuffle = true)将产生 * 1000个分区,Data is distributed using a hash partitioner.The optional partition combiner passed in must be serializable. * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
shuffle
默认情况
默认shuffle为false,默认不进行shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
shuffle处理流程
shufflepost data distribution
- 从一个随机分区开始,在输出分区中均匀分布元素
- HashPartitioner:The number of data partitions = position%总的分区数
- index:Upstream partition index;numPartitions:当前shuffle后的rdd的分区数
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
shuffle逻辑
- new ShuffledRDD() 构建shuffle RDD
- 构建shuffle RDD时,调用distributePartitionThe function adjusts the partitions of the data distribution
- 将shuffle RDD转为 CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
非shuffle处理流程
- 直接将RDD转为CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(this, numPartitions, partitionCoalescer)
参考资料
边栏推荐
- 入门文档03 区分开发与生产环境(生产环境才执行‘热更新’)
- 每日一题-括号生成-0721
- D39_欧拉角与四元数
- [Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
- D39_向量
- Getting Started Doc 06 Adding files to a stream
- Getting Started Document 01 series in order
- 网络布线与数制转换
- 偷题——腾讯游戏开发面试问题及解答
- 成功的独立开发者应对失败&冒名顶替综
猜你喜欢

【Day5】软硬链接 文件存储,删除,目录管理命令
![[Day8] (Super detailed steps) Use LVM to expand capacity](/img/ff/d4f06d8b30148496da64360268cf1b.png)
[Day8] (Super detailed steps) Use LVM to expand capacity

UE4动画雨滴材质制作教程
![[Pytorch study notes] 8. How to use WeightedRandomSampler (weight sampler) when the training category is unbalanced data](/img/29/5b44c016bd11f0c0a9110cf513f4e1.png)
[Pytorch study notes] 8. How to use WeightedRandomSampler (weight sampler) when the training category is unbalanced data

论那些给得出高薪的游戏公司底气到底在哪里?

2020年手机上最好的25种免费游戏

【Day8】使用LVM扩容所涉及的命令

Unity huatuo 革命性热更系列1.3 huatuo示例项目源码分析与启发

spark源码-RPC通信机制

如何使用Houdini进行程序化优化?
随机推荐
新一代解析技术——云解析
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
Unity物理引擎中的碰撞、角色控制器、Cloth组件(布料)、关节 Joint
[Day1] VMware software installation
每日一题-盛最多水的容器-0716
入门文档09 独立的watch
dsf5.0新建页面访问时重定向到首页的问题
ACL 和NAT
虚幻引擎5都有哪些重要新功能?
账号与权限管理
调用TensorFlow Objection Detection API进行目标检测并将检测结果保存至本地
OpenCV3.0 兼容VS2010与VS2013的问题
每日一题-电话号码的字母组合-0717
[Day6] File system permission management, file special permissions, hidden attributes
每日一题-最长回文子串-0714
每日一题-无重复字符的最长子串-0712
每日一题-字典
CIPU,对云计算产业有什么影响
Introductory document 05-2 use return instructions the current task has been completed
【Day6】文件系统权限管理 文件特殊权限 隐藏属性