当前位置:网站首页>spark优化
spark优化
2022-08-02 14:05:00 【boyzwz】
一、代码优化
1、避免创建重复的RDD
2、尽可能复用同一个RDD
3、对多次使用的RDD进行持久化
//缓存在内存中,因为不进行序列化与反序列化操作,性能最高,要求内存足够大
studentRDD.cache()
studentRDD.persist(StorageLevel.MEMORY_ONLY)
//若内存溢出,可将RDD数据序列化后再保存在内存中
studentRDD.persist(StorageLevel.MEMORY_ONLY_SER)
//将数据序列化后缓存在内存中,内存缓存不下才会写入磁盘
studentRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
4、尽量避免使用shuffle类算子
5、使用map-side预聚合的shuffle操作
当一个小表join大表时,可将小表广播出去,在map端进行关联(小表不超过1G)
6、使用高性能的算子
reduceByKey/aggregateByKey
mapPartitions
foreachPartitions
coalesce
/**
* repartition: 对rdd重分区,返回一个新的rdd, 会产生shuffle
* repartition可以用于增加分区和减少分区,
* 增加分区可以增加并行度,在资源充足的情况下, 效率更高
* 减少分区可以减少产生的小文件的数量
*
*/
val rePartRDD: RDD[String] = studentsRDD.repartition(10)
println(s"rePartRDD分区数据:${rePartRDD.getNumPartitions}")
/**
* coalesceL 重分区,,可以设置是否产生shuffle
* 如果指定shuffle为true,可以用于增加分区和减少分区
* 如果指定shuffle为false,只能用于减少分区
*
*/
val coalesceRDD: RDD[String] = rePartRDD.coalesce(100, shuffle = true)
println(s"coalesceRDD分区数据:${coalesceRDD.getNumPartitions}")
/**
* 当处理好的数据需要保存到磁盘的时候,如果产生了很多的小文件,可以使用coalesce合并小文件
* 合并的标准:保证合并之后的每一个文件的大小在128M左右
*
* 比如数据保存的数据是10G, 最好的情况是合并为80个
*
* shuffle = false: 不产生shuffle,效率更好
*
*/
coalesceRDD
.coalesce(1, shuffle = false) //合并小文件
.saveAsTextFile("data/coalesce")
7、广播大变量
如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率
8、使用Kryo优化序列化性能
使用Kryo序列化器,优化序列化性能
9、优化数据结构
尽量使用字符串替代对象,
使用原始类型(比如 Int、Long)替代字符串,
使用数组替代集合类型,
这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
10、使用高性能的库fastutil
二、参数优化
spark-submit
--master yarn-cluster
--num-executors = 50 //设定executor的数量
--executor-memory = 4G //设置每个executor的内存大小
--executor-cores = 2 //设置每个executor的核数
--driver-memory = 2G //设置Driver的内存1G-2G
--conf spark.storage.memoryFraction=0.4 //用于缓存的内存占比默认时0.6,如果代码中没有用到缓存 可以将内存分配给shuffle
--conf spark.shuffle.memoryFraction=0.4 //用户shuffle的内存占比默认0.2
--conf spark.locality.wait=10s //再executor中执行前的等待时间 默认3秒
--conf spark.yarn.executor.memoryOverhead=1024 //堆外内存 默认等于堆内存的10%
--conf spark.network.timeout=200s //网络连接的超时时间 默认120s总的内存=num-executors*executor-memory
总的核数=num-executors*executor-cores
spark on yarn 资源设置标准yarn的资源一般占总的资源的80%
1、单个任务总的内存和总的核数一般做多在yarn总资源的1/3到1/2之间2、在上线内再按照需要处理的数据量来合理指定资源 -- 最理想的情况是一个task对应一个core
三、数据倾斜优化
1、使用Hive ETL预处理数据
如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀,可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join)
2、过滤少数导致倾斜的key
将导致数据倾斜的key(对作业的执行和计算结果不是特别重要的话)过滤除去
3、提高shuffle操作的并行度
增加shuffle后的reduce task的数量,减少每一个reduce中分到的数据量,可以缓解数据倾斜
4、双重聚合
第一次是局部聚合,先给每个key 都打上一个随机数,接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合;
然后将各个key的前缀给去掉,再次进行全局聚合操作,就可以得到最终结果了
5、将reduce join转为map join
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
6、采样倾斜key并分拆join操作
提高数据本地级别
边栏推荐
猜你喜欢
随机推荐
无人驾驶综述:等级划分
Web Design (Beginners) [easy to understand]
十分钟带你入门Nodejs
C语言一维数组练习——将一个字符串中的某个字符替换成其它字符
window10 lower semi-automatic labeling
无人驾驶综述:国外国内发展历程
Creating seven NiuYun Flask project complete and let cloud
Paddle window10 environment using conda installation
uni-app页面、组件视图数据无法刷新问题的解决办法
Linux: CentOS 7 install MySQL5.7
MarkDown syntax summary
安装使用——百家CMS微商城说明文档(2)
Flask framework in-depth
ThinkPHP5.0内置分页函数Paginate无法获取POST页数问题的解决办法
Flask-RESTful request response and SQLAlchemy foundation
Unit 14 Viewsets and Routing
加减法运算及其溢出处理
drf source code analysis and global catch exception
重新学习编程day1 【初始c语言】【c语言编写出计算两个数之和的代码】
Eslint规则大全