当前位置:网站首页>The spark operator - coalesce operator
The spark operator - coalesce operator
2022-08-05 06:11:00 【zdaiqing】
完整代码
/** * 返回一个新的RDD,it is reduced tonumPartitions分区. * Return a new RDD that is reduced into `numPartitions` partitions. * * This leads to a narrow dependency,例如,如果你从1000个分区到100个分区,There will be no transfer, * 而是每100new partitions will be occupied10current partition.If the requested number of partitions is larger,The current score will be maintained * 区数量. * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * 然而,If you are doing a drastic merge,例如numPartitions = 1,This can cause your calculations to happen * on fewer nodes than you would like(例如,在numPartitions = 1In the case of only one node).为了避免 * 这种情况,你可以传递shuffle = true.这将增加一个shuffle步骤,But means the current upstream partition * 将并行执行(无论当前分区是什么). * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * 注意: * 使用shuffle = true,Can actually be merged into more partitions.If you have a small number of partitions(比如100个),并 * And there may be several partitions that are abnormally large,那么这很有用.调用coalesce(1000, shuffle = true)将产生 * 1000个分区,Data is distributed using a hash partitioner.The optional partition combiner passed in must be serializable. * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
shuffle
默认情况
默认shuffle为false,默认不进行shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
shuffle处理流程
shufflepost data distribution
- 从一个随机分区开始,在输出分区中均匀分布元素
- HashPartitioner:The number of data partitions = position%总的分区数
- index:Upstream partition index;numPartitions:当前shuffle后的rdd的分区数
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
shuffle逻辑
- new ShuffledRDD() 构建shuffle RDD
- 构建shuffle RDD时,调用distributePartitionThe function adjusts the partitions of the data distribution
- 将shuffle RDD转为 CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
非shuffle处理流程
- 直接将RDD转为CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(this, numPartitions, partitionCoalescer)
参考资料
边栏推荐
猜你喜欢

2020,Laya最新中高级面试灵魂32问,你都知道吗?

入门文档05-2 使用return指示当前任务已完成

Three modes of vim

通过单总线调用ds18b20的问题

Autoware--北科天绘rfans激光雷达使用相机&激光雷达联合标定文件验证点云图像融合效果

入门文档06 向流(stream)中添加文件

Getting Started Doc 08 Conditional Plugins

【Day1】(超详细步骤)构建软RAID磁盘阵列

unity实现第一人称漫游(保姆级教程)

Getting Started 05 Using cb() to indicate that the current task is complete
随机推荐
快问快答—腾讯云服务器常见问题解答
什么?CDN缓存加速只适用于加速静态内容?
Account and Permission Management
UE4动画雨滴材质制作教程
调用TensorFlow Objection Detection API进行目标检测并将检测结果保存至本地
Wireshark抓包及常用过滤方法
每日一题-下一个排列-0723
Apache配置反向代理
【Machine Learning】1 Univariate Linear Regression
spark源码-任务提交流程之-2-YarnClusterApplication
交换机原理
Getting Started Doc 08 Conditional Plugins
spark source code - task submission process - 2-YarnClusterApplication
Cocos Creator小游戏案例《棍子士兵》
spark算子-coalesce算子
每日一题-三数之和-0716(2)
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
腾讯云云函数SCF—入门须知
TCP/IP四层模型
偷题——腾讯游戏开发面试问题及解答