当前位置:网站首页>spark算子-parallelize算子
spark算子-parallelize算子
2022-08-05 05:18:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合。复制集合的元素以形成可以并行操作的分布式数据集。
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
需要进行并行化集合创建的现有集合
numSlices:
并行化集合的并行度(分区数)
该参数没传值情况下,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
底层根据传入参数构建ParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
该方法返回分区列表;
调用ParallelCollectionRDD伴生对象中slice方法实现数据分区;
根据分区后的数据列表,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
返回一个分区迭代器:迭代器元素为一个个元组,元组中存放每个分区开始和结束位置
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,将每个分区编码为一个range;该操作有利于减小内存开销,对于大数据集的处理非常高效;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// 保证分区数 > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// 数据分区实现逻辑:
// 返回一个分区迭代器:迭代器元素为一个个元组,元组中存放每个分区开始和结束位置
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,将根据分区规则分区后的数据,每个分区构建一个range;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,最后一个分区使用Inclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,将根据分区规则分区后的数据,每个分区构建一个NumericRange;最后返回一个以NumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//对于其他的集合,利用array的slice方法根据分区规则对数据进行分区后;最后返回一个新的集合;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark 创建算子源码解析
【spark】RDD分区解析
深入源码理解Spark RDD的数据分区原理
边栏推荐
- Redis设计与实现(第二部分):单机数据库的实现
- 如何使用Houdini进行程序化优化?
- 电子产品量产工具(1)- 显示系统实现
- 2020,Laya最新中高级面试灵魂32问,你都知道吗?
- 腾讯云云函数SCF—入门须知
- [Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
- OpenCV3.0 兼容VS2010与VS2013的问题
- 偷题——腾讯游戏开发面试问题及解答
- 【ts】typescript高阶:联合类型与交叉类型
- LeetCode刷题之第23题
猜你喜欢
随机推荐
Cocos Creator小游戏案例《棍子士兵》
腾讯云消息队列CMQ
C语言程序死循环问题解析——变量被修改
Autoware中安装Yolo3目标检测模块遇到的问题
ROS视频教程
UE5再次更新!扫描或手动建模面部模型可直接转为绑定好的Metahuman
C语言—三子棋的实现
电子产品量产工具(2)- 输入系统实现
【ts】typeScript高阶:any和unknown
电子产品量产工具(5)- 页面系统实现
七、请求处理——Map、Model类型参数处理原理
新一代解析技术——云解析
每日一题-电话号码的字母组合-0717
LeetCode刷题之第416题
Lua,ILRuntime, HybridCLR(wolong)/huatuo热更对比分析
腾讯内部技术:《轩辕传奇》服务器架构演变
每日一题-最长回文子串-0714
【UiPath2022+C#】UiPath If条件语句
手把手教你搭建小程序
每日一题-正则表达式匹配-0715









