当前位置:网站首页>The spark operator - coalesce operator
The spark operator - coalesce operator
2022-08-05 06:11:00 【zdaiqing】
完整代码
/** * 返回一个新的RDD,it is reduced tonumPartitions分区. * Return a new RDD that is reduced into `numPartitions` partitions. * * This leads to a narrow dependency,例如,如果你从1000个分区到100个分区,There will be no transfer, * 而是每100new partitions will be occupied10current partition.If the requested number of partitions is larger,The current score will be maintained * 区数量. * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * 然而,If you are doing a drastic merge,例如numPartitions = 1,This can cause your calculations to happen * on fewer nodes than you would like(例如,在numPartitions = 1In the case of only one node).为了避免 * 这种情况,你可以传递shuffle = true.这将增加一个shuffle步骤,But means the current upstream partition * 将并行执行(无论当前分区是什么). * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * 注意: * 使用shuffle = true,Can actually be merged into more partitions.If you have a small number of partitions(比如100个),并 * And there may be several partitions that are abnormally large,那么这很有用.调用coalesce(1000, shuffle = true)将产生 * 1000个分区,Data is distributed using a hash partitioner.The optional partition combiner passed in must be serializable. * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
shuffle
默认情况
默认shuffle为false,默认不进行shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
shuffle处理流程
shufflepost data distribution
- 从一个随机分区开始,在输出分区中均匀分布元素
- HashPartitioner:The number of data partitions = position%总的分区数
- index:Upstream partition index;numPartitions:当前shuffle后的rdd的分区数
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
shuffle逻辑
- new ShuffledRDD() 构建shuffle RDD
- 构建shuffle RDD时,调用distributePartitionThe function adjusts the partitions of the data distribution
- 将shuffle RDD转为 CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
非shuffle处理流程
- 直接将RDD转为CoalescedRDD,The number of partitions is based onnumPartitionsThe number of new partitions is determined
new CoalescedRDD(this, numPartitions, partitionCoalescer)
参考资料
边栏推荐
- 硬核!Cocos开发面试必备十问,让你offer拿到手软
- 【Day5】软硬链接 文件存储,删除,目录管理命令
- The problem of calling ds18b20 through a single bus
- 【Day1】(超详细步骤)构建软RAID磁盘阵列
- 入门文档09 独立的watch
- spark源码-任务提交流程之-1-sparkSubmit
- spark源码-任务提交流程之-3-ApplicationMaster
- Dsf5.0 bounced points determine not return a value
- Why can't I add a new hard disk to scan?How to solve?
- IP数据包格式(ICMP协议与ARP协议)
猜你喜欢
随机推荐
每日一题-盛最多水的容器-0716
Introductory document 05-2 use return instructions the current task has been completed
[Day8] (Super detailed steps) Use LVM to expand capacity
Remembering my first CCF-A conference paper | After six rejections, my paper is finally accepted, yay!
账号与权限管理
unity实现第一人称漫游(保姆级教程)
PVE 直通硬盘到TrueNAS
spark operator-wholeTextFiles operator
Getting Started Doc 06 Adding files to a stream
Image compression failure problem
spark算子-wholeTextFiles算子
[Day5] Soft and hard links File storage, deletion, directory management commands
Getting Started Document 09 Standalone watch
每日一题-电话号码的字母组合-0717
每日一题-无重复字符的最长子串-0712
Unity3D中的ref、out、Params三种参数的使用
云游戏未来展望
腾讯内部技术:《轩辕传奇》服务器架构演变
【Day1】(超详细步骤)构建软RAID磁盘阵列
【Day8】 RAID磁盘阵列









