当前位置:网站首页>spark算子-parallelize算子
spark算子-parallelize算子
2022-08-05 05:18:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合。复制集合的元素以形成可以并行操作的分布式数据集。
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
需要进行并行化集合创建的现有集合
numSlices:
并行化集合的并行度(分区数)
该参数没传值情况下,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
底层根据传入参数构建ParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
该方法返回分区列表;
调用ParallelCollectionRDD伴生对象中slice方法实现数据分区;
根据分区后的数据列表,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
返回一个分区迭代器:迭代器元素为一个个元组,元组中存放每个分区开始和结束位置
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,将每个分区编码为一个range;该操作有利于减小内存开销,对于大数据集的处理非常高效;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// 保证分区数 > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// 数据分区实现逻辑:
// 返回一个分区迭代器:迭代器元素为一个个元组,元组中存放每个分区开始和结束位置
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,将根据分区规则分区后的数据,每个分区构建一个range;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,最后一个分区使用Inclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,将根据分区规则分区后的数据,每个分区构建一个NumericRange;最后返回一个以NumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//对于其他的集合,利用array的slice方法根据分区规则对数据进行分区后;最后返回一个新的集合;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark 创建算子源码解析
【spark】RDD分区解析
深入源码理解Spark RDD的数据分区原理
边栏推荐
猜你喜欢
随机推荐
Autoware中安装Yolo3目标检测模块遇到的问题
framebuffer应用编程及文字显示(1)
(C语言)计算结构体大小——结构体内存对齐
C语言入门笔记 —— 函数(1)
腾讯云消息队列CMQ
Unity中的GetEnumerator 方法及MoveNext、Reset方法
硬核!Cocos开发面试必备十问,让你offer拿到手软
乘云科技受邀出席2022阿里云合作伙伴大会荣获“聚力行远奖”
idea 快速日志
什么是全栈设计师?
leetCode刷题之第31题
2020年手机上最好的25种免费游戏
Cocos Creator开发中的事件响应
UiPath简介
原型版本管理
Unity物理引擎中的碰撞、角色控制器、Cloth组件(布料)、关节 Joint
【3D模型教程】ZBrush如何表现皮肤纹理?
二、自动配置之底层注解
Lua,ILRuntime, HybridCLR(wolong)/huatuo热更对比分析
【ts】typescript高阶:映射类型与keyof









