当前位置:网站首页>spark算子-parallelize算子
spark算子-parallelize算子
2022-08-05 05:18:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合。复制集合的元素以形成可以并行操作的分布式数据集。
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
需要进行并行化集合创建的现有集合
numSlices:
并行化集合的并行度(分区数)
该参数没传值情况下,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
底层根据传入参数构建ParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
该方法返回分区列表;
调用ParallelCollectionRDD伴生对象中slice方法实现数据分区;
根据分区后的数据列表,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
返回一个分区迭代器:迭代器元素为一个个元组,元组中存放每个分区开始和结束位置
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,将每个分区编码为一个range;该操作有利于减小内存开销,对于大数据集的处理非常高效;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// 保证分区数 > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// 数据分区实现逻辑:
// 返回一个分区迭代器:迭代器元素为一个个元组,元组中存放每个分区开始和结束位置
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,将根据分区规则分区后的数据,每个分区构建一个range;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,最后一个分区使用Inclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,将根据分区规则分区后的数据,每个分区构建一个NumericRange;最后返回一个以NumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//对于其他的集合,利用array的slice方法根据分区规则对数据进行分区后;最后返回一个新的集合;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark 创建算子源码解析
【spark】RDD分区解析
深入源码理解Spark RDD的数据分区原理
边栏推荐
猜你喜欢
随机推荐
每日一题-括号生成-0721
【Shell编程】第一章:子串
多边形等分
来来来,一文让你读懂Cocos Creator如何读写JSON文件
调用TensorFlow Objection Detection API进行目标检测并将检测结果保存至本地
电子产品量产工具(2)- 输入系统实现
Unity3D中的ref、out、Params三种参数的使用
LeetCode刷题之第74题
网站ICP备案是什么呢?
Redis设计与实现(第一部分):数据结构与对象
九、响应处理——内容协商底层原理
阿里云视频点播
游戏引擎除了开发游戏还能做什么?
LeetCode刷题之第55题
spark源码-任务提交流程之-7-流程梳理总结
【UiPath2022+C#】UiPath数据类型
每日一题-无重复字符的最长子串-0712
【UiPath2022+C#】UiPath 数据操作
每日一题-有效的括号-0719
LeetCode刷题之第33题








