当前位置:网站首页>spark operator - map vs mapPartitions operator
spark operator - map vs mapPartitions operator
2022-08-05 06:11:00 【zdaiqing】
map vs mapPartitions
1.源码
1.1.map算子源码
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
1.2.mapPartitions算子源码
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
1.3.对比
相似
map算子和mapPartitionsThe bottom layer of the operator is the componentMapPartitionsRDD
区别
functional aspects
mapThe function of the function passed in by the operator,Is to process one element and return another element;
mapPartitionsThe function of the function passed in by the operator,is an iterator(一批元素)Returns another iterator after processing(一批元素);
function execution
map算子中,An iterator contains all the elements that the operator needs to process,有多少个元素,The incoming function is executed as many times as possible;
mapPartitions算子中,An iterator contains all elements in a partition,The function processes the data one iterator at a time,That is, one partition calls the function once;
1.4.Validation of execution times
代码
val lineSeq = Seq(
"hello me you her",
"hello you her",
"hello her",
"hello"
)
val rdd = sc.parallelize(lineSeq, 2)
.flatMap(_.split(" "))
println("===========mapPartitions.start==============")
rdd.mapPartitions(iter => {
println("mp+1")
iter.map(x =>
(x, 1)
)
}).collect()
println("===========map.start==============")
rdd.map(x => {
println("mp+2")
(x, 1)
}).collect()
执行结果
2.特点
map算子
有多少元素,How many times the function is executed
mapPartitions算子
有多少分区,How many times the function is executed
The function parameter is an iterator,返回的也是一个迭代器
3.使用场景分析
When performing simple element representation transformation operations,建议使用map算子,避免使用mapPartitions算子:
mapPartitionsThe function needs to return an iterator,When dealing with transformation operations on simple element representations,An intermediate cache is required to store the processing results,It is then converted to an iterator cache;这个情况下,The intermediate cache is stored in memory,If there are more elements to be processed in the iterator,容易引起OOM;
In scenarios of resource initialization overhead and batch processing in the case of large datasets,建议使用mapPartitions算子:
基于sparkCharacteristics of distributed execution operators,Each partition requires a separate resource initialization;mapPartitionsThe advantage of executing a function only once for a partition can realize that a partition needs only one resource initialization(eg:Scenarios that require database linking);
4.参考资料
Spark系列——关于 mapPartitions的误区
Spark—算子调优之MapPartitions提升Map类操作性能
Learning Spark——Spark连接Mysql、mapPartitions高效连接HBase
mapPartition
边栏推荐
- Lua,ILRuntime, HybridCLR(wolong)/huatuo热更对比分析
- Unity3D中的ref、out、Params三种参数的使用
- PVE 直通硬盘到TrueNAS
- spark源码-任务提交流程之-4-container中启动executor
- [Day8] (Super detailed steps) Use LVM to expand capacity
- spark算子-parallelize算子
- 【Day1】(超详细步骤)构建软RAID磁盘阵列
- 入门文档05 使用cb()指示当前任务已完成
- 【Machine Learning】1 Univariate Linear Regression
- ROS视频教程
猜你喜欢
随机推荐
Spark源码-任务提交流程之-6-sparkContext初始化
【Machine Learning】1 Univariate Linear Regression
[Day1] (Super detailed steps) Build a soft RAID disk array
The spark operator - repartition operator
入门文档10 资源映射
Getting Started Doc 08 Conditional Plugins
One-arm routing and 30% switch
Unity3D中的ref、out、Params三种参数的使用
Lua,ILRuntime, HybridCLR(wolong)/huatuo热更对比分析
不吹不黑,这的确是我看过微服务架构最好的文章!
入门文档08 条件插件
硬核!Cocos开发面试必备十问,让你offer拿到手软
虚幻引擎5都有哪些重要新功能?
腾讯云消息队列CMQ
每日一题-寻找两个正序数组的中位数-0713
入门文档01 series按顺序执行
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
huatuo 革命性热更新解决方案系列1·1 为什么这么NB?huatuo革命Unity热更新
[Day1] VMware software installation
idea 常用快捷键




![[Day8] (Super detailed steps) Use LVM to expand capacity](/img/ff/d4f06d8b30148496da64360268cf1b.png)




