当前位置:网站首页>spark算子-coalesce算子
spark算子-coalesce算子
2022-08-05 05:18:00 【zdaiqing】
完整代码
/** * 返回一个新的RDD,它被缩减为numPartitions分区。 * Return a new RDD that is reduced into `numPartitions` partitions. * * 这就导致了一个狭窄的依赖,例如,如果你从1000个分区到100个分区,不会有一个转移, * 而是每100个新分区将占用10个当前分区。如果请求的分区数量更大,则会保持当前的分 * 区数量。 * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * 然而,如果你正在做一个激烈的合并,例如numPartitions = 1,这可能会导致你的计算发生 * 在比你希望的更少的节点上(例如,在numPartitions = 1的情况下只有一个节点)。为了避免 * 这种情况,你可以传递shuffle = true。这将增加一个shuffle步骤,但意味着当前上游分区 * 将并行执行(无论当前分区是什么)。 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * 注意: * 使用shuffle = true,实际上可以合并到更多的分区。如果您有少量的分区(比如100个),并 * 且可能有几个分区异常大,那么这很有用。调用coalesce(1000, shuffle = true)将产生 * 1000个分区,数据使用散列分区器分布。传入的可选分区合并器必须是可序列化的。 * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
shuffle
默认情况
默认shuffle为false,默认不进行shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
shuffle处理流程
shuffle后数据分布
- 从一个随机分区开始,在输出分区中均匀分布元素
- HashPartitioner:数据落地分区数 = position%总的分区数
- index:上游分区索引;numPartitions:当前shuffle后的rdd的分区数
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map {
t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
shuffle逻辑
- new ShuffledRDD() 构建shuffle RDD
- 构建shuffle RDD时,调用distributePartition函数调整数据分布的分区
- 将shuffle RDD转为 CoalescedRDD,分区数根据numPartitions这个新分区数确定
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
非shuffle处理流程
- 直接将RDD转为CoalescedRDD,分区数根据numPartitions这个新分区数确定
new CoalescedRDD(this, numPartitions, partitionCoalescer)
参考资料
边栏推荐
猜你喜欢
通过单总线调用ds18b20的问题
【UiPath2022+C#】UiPath 练习和解决方案-变量、数据类型和控制流程
教你如何封装功能组件和页面组件
Contextual non-local alignment of full-scale representations
【UiPath2022+C#】UiPath 数据操作
TensorFlow ObjecDetectionAPI在win10系统Anaconda3下的配置
Redis设计与实现(第三部分):多机数据库的实现
【UiPath2022+C#】UiPath 练习-数据操作
【UiPath2022+C#】UiPath变量和参数
错误类型:reflection.ReflectionException: Could not set property ‘xxx‘ of ‘class ‘xxx‘ with value ‘xxx‘
随机推荐
Autoware--北科天绘rfans激光雷达使用相机&激光雷达联合标定文件验证点云图像融合效果
UE4动画雨滴材质制作教程
【UiPath2022+C#】UiPath数据类型
如何用UE5渲染一个可爱的茶壶屋?
Contextual non-local alignment of full-scale representations
TCP/IP四层模型
调用TensorFlow Objection Detection API进行目标检测并将检测结果保存至本地
unity实现第一人称漫游(保姆级教程)
D46_给刚体施加的力
五、请求处理—Rest映射是怎样实现的?
虚幻引擎5都有哪些重要新功能?
Unity3D中的ref、out、Params三种参数的使用
电子产品量产工具(2)- 输入系统实现
idea 快速日志
每日一题-单调栈
【UiPath2022+C#】UiPath If条件语句
电子产品量产工具(4)-UI系统实现
将一句话的单词进行倒置(C语言纯代码)
偷题——腾讯游戏开发面试问题及解答
D39_欧拉角与四元数