当前位置:网站首页>The spark operator - coalesce operator
The spark operator - coalesce operator
2022-08-05 06:11:00 【zdaiqing】
完整代码
/** * 返回一个新的RDD,it is reduced tonumPartitions分区. * Return a new RDD that is reduced into `numPartitions` partitions. * * This leads to a narrow dependency,例如,如果你从1000个分区到100个分区,There will be no transfer, * 而是每100new partitions will be occupied10current partition.If the requested number of partitions is larger,The current score will be maintained * 区数量. * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * 然而,If you are doing a drastic merge,例如numPartitions = 1,This can cause your calculations to happen * on fewer nodes than you would like(例如,在numPartitions = 1In the case of only one node).为了避免 * 这种情况,你可以传递shuffle = true.这将增加一个shuffle步骤,But means the current upstream partition * 将并行执行(无论当前分区是什么). * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * 注意: * 使用shuffle = true,Can actually be merged into more partitions.If you have a small number of partitions(比如100个),并 * And there may be several partitions that are abnormally large,那么这很有用.调用coalesce(1000, shuffle = true)将产生 * 1000个分区,Data is distributed using a hash partitioner.The optional partition combiner passed in must be serializable. * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
shuffle
默认情况
默认shuffle为false,默认不进行shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
shuffle处理流程
shufflepost data distribution
- 从一个随机分区开始,在输出分区中均匀分布元素
- HashPartitioner:The number of data partitions = position%总的分区数
- index:Upstream partition index;numPartitions:当前shuffle后的rdd的分区数
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
shuffle逻辑
- new ShuffledRDD() 构建shuffle RDD
- 构建shuffle RDD时,调用distributePartitionThe function adjusts the partitions of the data distribution
- 将shuffle RDD转为 CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
非shuffle处理流程
- 直接将RDD转为CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(this, numPartitions, partitionCoalescer)
参考资料
边栏推荐
- The problem of calling ds18b20 through a single bus
- Hard Disk Partitioning and Permanent Mounting
- VRRP原理及命令
- Configuration of TensorFlow ObjecDetectionAPI under Anaconda3 of win10 system
- D39_欧拉角与四元数
- spark源码-任务提交流程之-5-CoarseGrainedExecutorBackend
- 新一代解析技术——云解析
- TensorFlow ObjecDetectionAPI在win10系统Anaconda3下的配置
- Getting Started 04 When a task depends on another task, it needs to be executed in sequence
- 你要找的cocos面试答案都在这里了!
猜你喜欢

dsf5.0新建页面访问时重定向到首页的问题

Cocos Creator小游戏案例《棍子士兵》

【Day1】VMware软件安装

Getting Started Document 07 Staged Output

lvm逻辑卷及磁盘配额

网络布线与数制转换
![[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)](/img/a7/fc3fe440f5e57362d44ae875b7d436.png)
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)

Remembering my first CCF-A conference paper | After six rejections, my paper is finally accepted, yay!

Dsf5.0 bounced points determine not return a value

链表章6道easy总结(leetcode)
随机推荐
入门文档03 区分开发与生产环境(生产环境才执行‘热更新’)
vim的三种模式
【Day8】(超详细步骤)使用LVM扩容
云游戏未来展望
The spark operator - repartition operator
spark算子-repartition算子
Getting Started 04 When a task depends on another task, it needs to be executed in sequence
入门文档11 自动添加版本号
论那些给得出高薪的游戏公司底气到底在哪里?
每日一题-两数相加-0711
VLAN详解及实验
什么是全栈设计师?
每日一题-最长回文子串-0714
游戏引擎除了开发游戏还能做什么?
UE4动画雨滴材质制作教程
Getting Started Document 07 Staged Output
Getting Started 05 Using cb() to indicate that the current task is complete
UE4美术你有必要了解的数学基础
什么是阿里云·速成美站?
IP地址及子网的划分