当前位置:网站首页>spark operator-parallelize operator
spark operator-parallelize operator
2022-08-05 06:12:00 【zdaiqing】
parallelize算子
parallelize算子说明
功能说明
通过在驱动程序(Scala/Java其他)中的现有集合上调用SparkContext'方法来创建并行化集合.复制集合的元素以形成可以并行操作的分布式数据集.
例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
parallelize算子源码
参数说明
seq:
An existing collection that needs to be parallelized for collection creation
numSlices:
The degree of parallelism of the parallelized collection(分区数)
When this parameter is not passed a value,默认赋值defaultParallelism:
全局配置的spark.default.parallelism参数有值,就取参数值;
全局配置的spark.default.parallelism参数没有设置:
集群模式下,在cpu总核数 vs 2之间取最大值;
local模式下,取cpu总核数;
实现逻辑
The bottom layer is constructed based on incoming parametersParallelCollectionRDD;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD的分区原理
getPartitions方法源码
This method returns a list of partitions;
调用ParallelCollectionRDD伴生对象中slicemethod to implement data partitioning;
According to the partitioned data list,构建返回arallelCollectionPartition 的列表;
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
slice方法源码
分区规则
Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
开始:分区id * 数据长度 / 分区数
结束:(分区id + 1) * 数据长度 / 分区数
说明
针对range集合,Encode each partition as onerange;This operation helps to reduce memory overhead,Very efficient for processing large datasets;
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// Guaranteed number of partitions > 0
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Data partitioning implements logic:
// Returns a partition iterator:Iterator elements are tuples,The tuple holds the start and end positions of each partition
// 开始:分区id * 数据长度 / 分区数
// 结束:(分区id + 1) * 数据长度 / 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
//对于range集合,The data will be partitioned according to the partitioning rules,Build one per partitionrange;最后返回一个新的range集合
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map {
case ((start, end), index) =>
// 针对inclusive的range,The last partition is usedInclusive方法构建
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
// 对于NumericRange集合,The data will be partitioned according to the partitioning rules,Build one per partitionNumericRange;Finally returns a withNumericRange为元素的ArrayBuffer
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
//for other sets,利用array的slicemethod after partitioning the data according to the partitioning rules;Finally a new collection is returned;
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map {
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
参考文件
spark算子-textFile算子
RDD编程指南
spark Create operator source code parsing
【spark】RDDPartition analysis
深入源码理解Spark RDD的数据分区原理
边栏推荐
- Getting Started Document 09 Standalone watch
- Apache configure reverse proxy
- spark算子-wholeTextFiles算子
- 【Machine Learning】1 Univariate Linear Regression
- Hard Disk Partitioning and Permanent Mounting
- Getting Started Documentation 12 webserve + Hot Updates
- [Day1] VMware software installation
- 云游戏未来展望
- Getting Started Documentation 10 Resource Mapping
- Spark源码-任务提交流程之-6.1-sparkContext初始化-创建spark driver端执行环境SparkEnv
猜你喜欢
spark source code - task submission process - 1-sparkSubmit
入门文档06 向流(stream)中添加文件
Autoware中安装Yolo3目标检测模块遇到的问题
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
spark算子-map vs mapPartitions算子
传输层协议(TCP3次握手)
lvm logical volume and disk quota
spark源码-任务提交流程之-1-sparkSubmit
入门文档12 webserve + 热更新
VLAN details and experiments
随机推荐
交换机原理
[Day5] Soft and hard links File storage, deletion, directory management commands
入门文档06 向流(stream)中添加文件
Getting Started 11 Automatically add version numbers
腾讯云云函数SCF—入门须知
【3D模型教程】ZBrush如何表现皮肤纹理?
IP数据包格式(ICMP协议与ARP协议)
Autoware中安装Yolo3目标检测模块遇到的问题
Getting Started Document 01 series in order
Getting Started Document 09 Standalone watch
Cocos Creator开发中的事件响应
To TrueNAS PVE through hard disk
传输层协议(TCP3次握手)
spark operator-wholeTextFiles operator
huatuo 革命性热更新解决方案系列1·1 为什么这么NB?huatuo革命Unity热更新
Autoware--北科天绘rfans激光雷达使用相机&激光雷达联合标定文件验证点云图像融合效果
Getting Started Documentation 12 webserve + Hot Updates
【Day8】磁盘及磁盘的分区有关知识
Cocos Creator小游戏案例《棍子士兵》
单臂路由与三成交换机