当前位置:网站首页>spark operator-parallelize operator
spark operator-parallelize operator
2022-08-05 06:12:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合.复制集合的元素以形成可以并行操作的分布式数据集.
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
An existing collection that needs to be parallelized for collection creation
numSlices:
The degree of parallelism of the parallelized collection(分区数)
When this parameter is not passed a value,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
The bottom layer is constructed based on incoming parametersParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
This method returns a list of partitions;
调用ParallelCollectionRDD伴生对象中slicemethod to implement data partitioning;
According to the partitioned data list,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,Encode each partition as onerange;This operation helps to reduce memory overhead,Very efficient for processing large datasets;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// Guaranteed number of partitions > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Data partitioning implements logic:
// Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,The data will be partitioned according to the partitioning rules,Build one per partitionrange;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,The last partition is usedInclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,The data will be partitioned according to the partitioning rules,Build one per partitionNumericRange;Finally returns a withNumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//for other sets,利用array的slicemethod after partitioning the data according to the partitioning rules;Finally a new collection is returned;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark Create operator source code parsing
【spark】RDDPartition analysis
深入源码理解Spark RDD的数据分区原理
边栏推荐
猜你喜欢

【Day8】Knowledge about disk and disk partition

Getting Started Document 07 Staged Output

Getting Started Documentation 12 webserve + Hot Updates

硬盘分区和永久挂载

Unity中的GetEnumerator 方法及MoveNext、Reset方法

成功的独立开发者应对失败&冒名顶替综

入门文档06 向流(stream)中添加文件

Why can't I add a new hard disk to scan?How to solve?

【Day5】软硬链接 文件存储,删除,目录管理命令

vim的三种模式
随机推荐
Spark源码-任务提交流程之-6.1-sparkContext初始化-创建spark driver端执行环境SparkEnv
Introductory document 05-2 use return instructions the current task has been completed
unity实现第一人称漫游(保姆级教程)
D41_缓冲池
偷题——腾讯游戏开发面试问题及解答
入门文档08 条件插件
VRRP原理及命令
无影云桌面
什么是阿里云·速成美站?
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
Cocos Creator小游戏案例《棍子士兵》
Servlet跳转到JSP页面,转发和重定向
硬盘分区和永久挂载
vim的三种模式
OpenCV3.0 兼容VS2010与VS2013的问题
dsf5.0 弹框点确定没有返回值的问题
入门文档05 使用cb()指示当前任务已完成
spark源码-任务提交流程之-5-CoarseGrainedExecutorBackend
Getting Started 04 When a task depends on another task, it needs to be executed in sequence
spark源码-任务提交流程之-4-container中启动executor