当前位置:网站首页>spark operator-parallelize operator
spark operator-parallelize operator
2022-08-05 06:12:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合.复制集合的元素以形成可以并行操作的分布式数据集.
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
An existing collection that needs to be parallelized for collection creation
numSlices:
The degree of parallelism of the parallelized collection(分区数)
When this parameter is not passed a value,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
The bottom layer is constructed based on incoming parametersParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
This method returns a list of partitions;
调用ParallelCollectionRDD伴生对象中slicemethod to implement data partitioning;
According to the partitioned data list,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,Encode each partition as onerange;This operation helps to reduce memory overhead,Very efficient for processing large datasets;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// Guaranteed number of partitions > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Data partitioning implements logic:
// Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,The data will be partitioned according to the partitioning rules,Build one per partitionrange;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,The last partition is usedInclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,The data will be partitioned according to the partitioning rules,Build one per partitionNumericRange;Finally returns a withNumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//for other sets,利用array的slicemethod after partitioning the data according to the partitioning rules;Finally a new collection is returned;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark Create operator source code parsing
【spark】RDDPartition analysis
深入源码理解Spark RDD的数据分区原理
边栏推荐
- 【Day8】使用LVM扩容所涉及的命令
- Spark源码-任务提交流程之-6.2-sparkContext初始化-TaskScheduler任务调度器
- Call the TensorFlow Objection Detection API for object detection and save the detection results locally
- spark源码-任务提交流程之-1-sparkSubmit
- 入门文档10 资源映射
- [Day8] Commands involved in using LVM to expand
- OpenCV3.0 兼容VS2010与VS2013的问题
- 【Day8】(超详细步骤)使用LVM扩容
- spark source code - task submission process - 2-YarnClusterApplication
- 【Day6】文件系统权限管理 文件特殊权限 隐藏属性
猜你喜欢
随机推荐
VLAN details and experiments
Unity物理引擎中的碰撞、角色控制器、Cloth组件(布料)、关节 Joint
Introductory document 05-2 use return instructions the current task has been completed
云计算——osi七层与TCP\IP协议
ROS video tutorial
D39_坐标转换
LeetCode面试题
【Day8】(超详细步骤)使用LVM扩容
账号与权限管理
The spark operator - coalesce operator
入门文档01 series按顺序执行
腾讯云云函数SCF—入门须知
【3D模型教程】ZBrush如何表现皮肤纹理?
markdown编辑器模板
入门文档09 独立的watch
【Day8】 RAID磁盘阵列
spark source code - task submission process - 5-CoarseGrainedExecutorBackend
js动态获取屏幕宽高度
The problem of calling ds18b20 through a single bus
spark源码-任务提交流程之-2-YarnClusterApplication








