当前位置:网站首页>spark operator-parallelize operator
spark operator-parallelize operator
2022-08-05 06:12:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合.复制集合的元素以形成可以并行操作的分布式数据集.
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
An existing collection that needs to be parallelized for collection creation
numSlices:
The degree of parallelism of the parallelized collection(分区数)
When this parameter is not passed a value,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
The bottom layer is constructed based on incoming parametersParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
This method returns a list of partitions;
调用ParallelCollectionRDD伴生对象中slicemethod to implement data partitioning;
According to the partitioned data list,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,Encode each partition as onerange;This operation helps to reduce memory overhead,Very efficient for processing large datasets;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// Guaranteed number of partitions > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Data partitioning implements logic:
// Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,The data will be partitioned according to the partitioning rules,Build one per partitionrange;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,The last partition is usedInclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,The data will be partitioned according to the partitioning rules,Build one per partitionNumericRange;Finally returns a withNumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//for other sets,利用array的slicemethod after partitioning the data according to the partitioning rules;Finally a new collection is returned;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark Create operator source code parsing
【spark】RDDPartition analysis
深入源码理解Spark RDD的数据分区原理
边栏推荐
猜你喜欢

Unity huatuo 革命性热更系列1.3 huatuo示例项目源码分析与启发

markdown编辑器模板

TCP/IP四层模型

入门文档04 一个任务依赖另外一个任务时,需要按顺序执行

One-arm routing and 30% switch

Why can't I add a new hard disk to scan?How to solve?
![[Day6] File system permission management, file special permissions, hidden attributes](/img/ec/7fb3fa671fac8abf389844c0f4fbe7.png)
[Day6] File system permission management, file special permissions, hidden attributes

单臂路由与三成交换机

账号与权限管理

spark source code - task submission process - 1-sparkSubmit
随机推荐
[Day1] (Super detailed steps) Build a soft RAID disk array
Spark源码-任务提交流程之-6.2-sparkContext初始化-TaskScheduler任务调度器
快问快答—腾讯云服务器常见问题解答
[Day8] Commands involved in using LVM to expand
Autoware--北科天绘rfans激光雷达使用相机&激光雷达联合标定文件验证点云图像融合效果
网络布线与数制转换
什么?CDN缓存加速只适用于加速静态内容?
Getting Started Document 01 series in order
每日一题-寻找两个正序数组的中位数-0713
The problem of redirecting to the home page when visiting a new page in dsf5.0
lvm logical volume and disk quota
spark source code - task submission process - 2-YarnClusterApplication
账号与权限管理
硬盘分区和永久挂载
VLAN详解及实验
Spark源码-任务提交流程之-6-sparkContext初始化
VLAN details and experiments
2020,Laya最新中高级面试灵魂32问,你都知道吗?
TCP/IP four-layer model
[Day5] Soft and hard links File storage, deletion, directory management commands