当前位置:网站首页>spark operator-parallelize operator
spark operator-parallelize operator
2022-08-05 06:12:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合.复制集合的元素以形成可以并行操作的分布式数据集.
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
An existing collection that needs to be parallelized for collection creation
numSlices:
The degree of parallelism of the parallelized collection(分区数)
When this parameter is not passed a value,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
The bottom layer is constructed based on incoming parametersParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
This method returns a list of partitions;
调用ParallelCollectionRDD伴生对象中slicemethod to implement data partitioning;
According to the partitioned data list,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,Encode each partition as onerange;This operation helps to reduce memory overhead,Very efficient for processing large datasets;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// Guaranteed number of partitions > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Data partitioning implements logic:
// Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,The data will be partitioned according to the partitioning rules,Build one per partitionrange;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,The last partition is usedInclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,The data will be partitioned according to the partitioning rules,Build one per partitionNumericRange;Finally returns a withNumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//for other sets,利用array的slicemethod after partitioning the data according to the partitioning rules;Finally a new collection is returned;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark Create operator source code parsing
【spark】RDDPartition analysis
深入源码理解Spark RDD的数据分区原理
边栏推荐
猜你喜欢
随机推荐
TCP/IP four-layer model
什么?CDN缓存加速只适用于加速静态内容?
js动态获取屏幕宽高度
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
LeetCode面试题
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
VRRP principle and command
腾讯云云函数SCF—入门须知
spark源码-任务提交流程之-3-ApplicationMaster
spark源码-任务提交流程之-2-YarnClusterApplication
Autoware--北科天绘rfans激光雷达使用相机&激光雷达联合标定文件验证点云图像融合效果
IJCAI 2022|Boundary-Guided Camouflage Object Detection Model BGNet
spark源码-任务提交流程之-4-container中启动executor
通过单总线调用ds18b20的问题
无影云桌面
OpenCV3.0 is compatible with VS2010 and VS2013
Introductory document 05-2 use return instructions the current task has been completed
dsf5.0新建页面访问时重定向到首页的问题
入门文档03 区分开发与生产环境(生产环境才执行‘热更新’)
海外服务器的优势



![[Day6] File system permission management, file special permissions, hidden attributes](/img/ec/7fb3fa671fac8abf389844c0f4fbe7.png)




