当前位置:网站首页>spark优化
spark优化
2022-08-02 14:05:00 【boyzwz】
一、代码优化
1、避免创建重复的RDD
2、尽可能复用同一个RDD
3、对多次使用的RDD进行持久化
//缓存在内存中,因为不进行序列化与反序列化操作,性能最高,要求内存足够大
studentRDD.cache()
studentRDD.persist(StorageLevel.MEMORY_ONLY)
//若内存溢出,可将RDD数据序列化后再保存在内存中
studentRDD.persist(StorageLevel.MEMORY_ONLY_SER)
//将数据序列化后缓存在内存中,内存缓存不下才会写入磁盘
studentRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
4、尽量避免使用shuffle类算子
5、使用map-side预聚合的shuffle操作
当一个小表join大表时,可将小表广播出去,在map端进行关联(小表不超过1G)
6、使用高性能的算子
reduceByKey/aggregateByKey
mapPartitions
foreachPartitions
coalesce
/**
* repartition: 对rdd重分区,返回一个新的rdd, 会产生shuffle
* repartition可以用于增加分区和减少分区,
* 增加分区可以增加并行度,在资源充足的情况下, 效率更高
* 减少分区可以减少产生的小文件的数量
*
*/
val rePartRDD: RDD[String] = studentsRDD.repartition(10)
println(s"rePartRDD分区数据:${rePartRDD.getNumPartitions}")
/**
* coalesceL 重分区,,可以设置是否产生shuffle
* 如果指定shuffle为true,可以用于增加分区和减少分区
* 如果指定shuffle为false,只能用于减少分区
*
*/
val coalesceRDD: RDD[String] = rePartRDD.coalesce(100, shuffle = true)
println(s"coalesceRDD分区数据:${coalesceRDD.getNumPartitions}")
/**
* 当处理好的数据需要保存到磁盘的时候,如果产生了很多的小文件,可以使用coalesce合并小文件
* 合并的标准:保证合并之后的每一个文件的大小在128M左右
*
* 比如数据保存的数据是10G, 最好的情况是合并为80个
*
* shuffle = false: 不产生shuffle,效率更好
*
*/
coalesceRDD
.coalesce(1, shuffle = false) //合并小文件
.saveAsTextFile("data/coalesce")
7、广播大变量
如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率
8、使用Kryo优化序列化性能
使用Kryo序列化器,优化序列化性能
9、优化数据结构
尽量使用字符串替代对象,
使用原始类型(比如 Int、Long)替代字符串,
使用数组替代集合类型,
这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
10、使用高性能的库fastutil
二、参数优化
spark-submit
--master yarn-cluster
--num-executors = 50 //设定executor的数量
--executor-memory = 4G //设置每个executor的内存大小
--executor-cores = 2 //设置每个executor的核数
--driver-memory = 2G //设置Driver的内存1G-2G
--conf spark.storage.memoryFraction=0.4 //用于缓存的内存占比默认时0.6,如果代码中没有用到缓存 可以将内存分配给shuffle
--conf spark.shuffle.memoryFraction=0.4 //用户shuffle的内存占比默认0.2
--conf spark.locality.wait=10s //再executor中执行前的等待时间 默认3秒
--conf spark.yarn.executor.memoryOverhead=1024 //堆外内存 默认等于堆内存的10%
--conf spark.network.timeout=200s //网络连接的超时时间 默认120s总的内存=num-executors*executor-memory
总的核数=num-executors*executor-cores
spark on yarn 资源设置标准yarn的资源一般占总的资源的80%
1、单个任务总的内存和总的核数一般做多在yarn总资源的1/3到1/2之间2、在上线内再按照需要处理的数据量来合理指定资源 -- 最理想的情况是一个task对应一个core
三、数据倾斜优化
1、使用Hive ETL预处理数据
如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀,可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join)
2、过滤少数导致倾斜的key
将导致数据倾斜的key(对作业的执行和计算结果不是特别重要的话)过滤除去
3、提高shuffle操作的并行度
增加shuffle后的reduce task的数量,减少每一个reduce中分到的数据量,可以缓解数据倾斜
4、双重聚合
第一次是局部聚合,先给每个key 都打上一个随机数,接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合;
然后将各个key的前缀给去掉,再次进行全局聚合操作,就可以得到最终结果了
5、将reduce join转为map join
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
6、采样倾斜key并分拆join操作
提高数据本地级别
边栏推荐
- Using the cloud GPU + pycharm training model to realize automatic background run programs, save training results, the server automatically power off
- getUserProfile接口不显示用户性别和地区
- Implementation of redis distributed lock and watchdog
- 基于ThinkPHP6.0 - 宝塔搭建漫画CMS管理系统源码实测
- 我理解的学习金字塔
- MySQL知识总结 (六) MySQL调优
- 执行栈和执行上下文
- 使用云GPU+pycharm训练模型实现后台跑程序、自动保存训练结果、服务器自动关机
- 宝塔搭建PHP自适应懒人网址导航源码实测
- Kubernetes架构和组件
猜你喜欢
随机推荐
uniapp小程序禁止遮罩弹窗下的页面滚动的完美解决办法
Flask framework in-depth
liunx下mysql遇到的简单问题
绕过正则实现SQL注入
Tornado framework routing system introduction and (IOloop.current().start()) start source code analysis
Flink前期代码结构
MySQL知识总结 (二) 存储引擎
Error Correction Design Principle of Hamming Check Code
Unit 11 Serializers
yolov5 improvement (1) Add attention focus mechanism
c语言用scanf出错不安全的解决办法
C语言一级指针(补)
getUserProfile接口不显示用户性别和地区
ng-style:动态控制样式
Unit 14 Viewsets and Routing
C语言日记 2 标识符
猜数字游戏,猜错10次关机(srand、rand、time)随机数生成三板斧(详细讲解!不懂问我!)
存储系统Cache(知识点+例题)
Introduction and use of Haystack
MongoDB Compass 安装与使用