当前位置:网站首页>实现自定义Spark优化规则
实现自定义Spark优化规则
2022-07-27 14:23:00 【wankunde】
Catalyst optimizer
Spark SQL 使用一个叫 catalyst 的优化器对所有使用 spark sql 和 dataframe dsl的查询进行优化。经过优化的查询会比使用RDD直接编写的程序运行更快。catalyst 是 rule based 优化器,内部提供了很多优化规则,这些内部优化规则后续有时间再做具体的详细介绍,我们今天主要来讨论一下如何在不修改源码的情况下,以插件的方式来编写和应用我们自定义的优化规则。
实战编写一个优化规则
我们这里做一个简单的优化规则,实现功能:如果我们select 的一个数值类型的字段去乘以 1.0 这个字符串,我们把这个乘法计算给优化掉。
编写优化规则
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
object MultiplyOptimizationRule extends Rule[LogicalPlan] with Logging {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
logInfo("MyRule 优化规则生效")
left
}
}
注册优化规则
通过spark提供的接口来注册我们编写好的优化规则
spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)
测试结果
我们在命令行中测试一下,我们可以看到 Project 选择的字段中,(cast(id#7L as double) * 1.0) AS id2#12 已经被优化为 cast(id#7L as double) AS id2#14
scala> val df = spark.range(10).selectExpr("id", "concat('wankun-',id) as name")
df: org.apache.spark.sql.DataFrame = [id: bigint, name: string]
scala> val multipliedDF = df.selectExpr("id * cast(1.0 as double) as id2")
multipliedDF: org.apache.spark.sql.DataFrame = [id2: double]
scala> println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)
00 Project [(cast(id#7L as double) * 1.0) AS id2#12]
01 +- Range (0, 10, step=1, splits=Some(1))
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
object MultiplyOptimizationRule extends Rule[LogicalPlan] with Logging {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
logInfo("MyRule 优化规则生效")
left
}
}
scala> spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)
spark.experimental.extraOptimizations: Seq[org.apache.spark.sql.catalyst.rules.Rule[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]] = List(MultiplyOptimizationRule$@675d209c)
scala>
scala> val multipliedDFWithOptimization = df.selectExpr("id * cast(1.0 as double) as id2")
multipliedDFWithOptimization: org.apache.spark.sql.DataFrame = [id2: double]
scala> println(multipliedDFWithOptimization.queryExecution.optimizedPlan.numberedTreeString)
00 Project [cast(id#7L as double) AS id2#14]
01 +- Range (0, 10, step=1, splits=Some(1))
添加钩子和扩展点功能
通过上面的实例,我们通过spark提供的接口编程,可以实现来添加我们自定义的优化规则。
但是我们的spark-sql工具并不能允许我们进行直接编程添加规则,另外,catalyst 内部还有 Analysis, Logical Optimization, Physical Planning 多个阶段,如果我们想在这些地方做一个功能扩展,就不方便了。所以在Spark 2.2 版本又引入了一个更加强大的特性,添加钩子和扩展点。

编写自定义优化规则和扩展点
还是以实现功能的功能为例:
package com.wankun.sql.optimizer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{
Literal, Multiply}
import org.apache.spark.sql.{
SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.Decimal
/** * @author kun.wan * @date 2020-03-03. */
case class MyRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
logInfo("开始应用 MyRule 优化规则")
plan transformAllExpressions {
case Multiply(left, right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.isInstanceOf[Decimal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Decimal].toDouble == 1.0 =>
logInfo("MyRule 优化规则生效")
left
}
}
}
class MyExtensions extends (SparkSessionExtensions => Unit) with Logging {
def apply(e: SparkSessionExtensions): Unit = {
logInfo("进入MyExtensions扩展点")
e.injectResolutionRule(MyRule)
}
}
将上述代码打包为 spark-extensions-1.0.jar
配置并启用自定义Spark扩展
spark-sql --master local --conf spark.sql.extensions=com.wankun.sql.optimizer.MyExtensions --jars /Users/wankun/ws/wankun/spark-extensions/target/spark-extensions-1.0.jar
测试优化规则
可以看到 plan 被 Analyzed 之后,乘法运算消失,已经自定义优化规则已经生效。
spark-sql> explain extended
> with stu as (
> select 1 as id, 'wankun-1' as name
> union
> select 2 as id, 'wankun-2' as name
> union
> select 3 as id, 'wankun-3' as name
> )
> select id * 1.0
> from stu;
20/03/04 01:56:16 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) 开始应用 MyRule 优化规则
20/03/04 01:56:17 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) 开始应用 MyRule 优化规则
20/03/04 01:56:17 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) 开始应用 MyRule 优化规则
20/03/04 01:56:17 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) MyRule 优化规则生效
20/03/04 01:56:17 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) 开始应用 MyRule 优化规则
20/03/04 01:56:17 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) 开始应用 MyRule 优化规则
20/03/04 01:56:17 INFO MyRule: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) 开始应用 MyRule 优化规则
20/03/04 01:56:18 INFO CodeGenerator: org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) Code generated in 156.862003 ms
== Parsed Logical Plan ==
CTE [stu]
: +- SubqueryAlias `stu`
: +- Distinct
: +- Union
: :- Distinct
: : +- Union
: : :- Project [1 AS id#0, wankun-1 AS name#1]
: : : +- OneRowRelation
: : +- Project [2 AS id#2, wankun-2 AS name#3]
: : +- OneRowRelation
: +- Project [3 AS id#4, wankun-3 AS name#5]
: +- OneRowRelation
+- 'Project [unresolvedalias(('id * 1.0), None)]
+- 'UnresolvedRelation `stu`
== Analyzed Logical Plan ==
id: decimal(10,0)
Project [cast(id#0 as decimal(10,0)) AS id#8]
+- SubqueryAlias `stu`
+- Distinct
+- Union
:- Distinct
: +- Union
: :- Project [1 AS id#0, wankun-1 AS name#1]
: : +- OneRowRelation
: +- Project [2 AS id#2, wankun-2 AS name#3]
: +- OneRowRelation
+- Project [3 AS id#4, wankun-3 AS name#5]
+- OneRowRelation
== Optimized Logical Plan ==
Aggregate [id#0, name#1], [cast(id#0 as decimal(10,0)) AS id#8]
+- Union
:- Project [1 AS id#0, wankun-1 AS name#1]
: +- OneRowRelation
:- Project [2 AS id#2, wankun-2 AS name#3]
: +- OneRowRelation
+- Project [3 AS id#4, wankun-3 AS name#5]
+- OneRowRelation
== Physical Plan ==
*(5) HashAggregate(keys=[id#0, name#1], functions=[], output=[id#8])
+- Exchange hashpartitioning(id#0, name#1, 200)
+- *(4) HashAggregate(keys=[id#0, name#1], functions=[], output=[id#0, name#1])
+- Union
:- *(1) Project [1 AS id#0, wankun-1 AS name#1]
: +- Scan OneRowRelation[]
:- *(2) Project [2 AS id#2, wankun-2 AS name#3]
: +- Scan OneRowRelation[]
+- *(3) Project [3 AS id#4, wankun-3 AS name#5]
+- Scan OneRowRelation[]
Time taken: 1.945 seconds, Fetched 1 row(s)
sparkSession 中给用户留了扩展点,Spark catalyst的扩展点在SPARK-18127中被引入,Spark用户可以在SQL处理的各个阶段扩展自定义实现,非常强大高效
- injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化,我们例子中就是扩展了逻辑优化规则。
- injectParser – 添加parser自定义规则,parser负责SQL解析。
- injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。
- injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。
- injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。
- injectCheckRule – 添加Analyzer自定义Check规则。

参考文档
边栏推荐
- Watermelon book machine learning reading notes Chapter 1 Introduction
- Leetcode 191. number of 1 bits bit operation /easy
- STM32学习之CAN控制器简介
- 魔塔项目中的问题解决
- Some binary bit operations
- Basic usage of kotlin
- Leetcode 783. binary search tree node minimum distance tree /easy
- Inside router of network equipment hard core technology (10) disassembly of Cisco asr9900 (4)
- Sword finger offer merges two sorted linked lists
- Spark动态资源分配的资源释放过程及BlockManager清理过程
猜你喜欢

Code coverage statistical artifact -jacobo tool practice

华云数据打造完善的信创人才培养体系 助力信创产业高质量发展

Record record record

Leetcode 783. binary search tree node minimum distance tree /easy
USB interface electromagnetic compatibility (EMC) solution

Watermelon book machine learning reading notes Chapter 1 Introduction

ADB command (install APK package format: ADB install APK address package name on the computer)

Leetcode 74. search two-dimensional matrix bisection /medium

4种单片机驱动继电器方案

DIY制作示波器的超详细教程:(一)我不是为了做一个示波器
随机推荐
Google team launches new transformer to optimize panoramic segmentation scheme CVPR 2022
Design scheme of digital oscilloscope based on stm32
Notice of Nanshan District Civil Affairs Bureau on carrying out the grade evaluation of social organizations in Nanshan District in 2022
Leetcode 81. search rotation sort array II binary /medium
Huayun data creates a perfect information technology and innovation talent training system to help the high-quality development of information technology and innovation industry
Zhou Hongyi: if the digital security ability is backward, it will also be beaten
Network equipment hard core technology insider router Chapter 6 tompkinson roaming the online world (middle)
Kotlin的基础用法
md 中超链接的解析问题:解析`this.$set()`,`$`前要加空格或转义符 `\`
Internship: compilation of other configuration classes
Leetcode-1737-满足三条件之一需改变的最少字符数
Leetcode 191. number of 1 bits bit operation /easy
《剑指Offer》 合并两个排序的链表
Leetcode 781. rabbit hash table in forest / mathematical problem medium
Leetcode-1737- minimum number of characters to change if one of the three conditions is met
Simple mathematical knowledge related to 3D
学习Parquet文件格式
STM32 CAN 通信 滤波设置问题
使用Prometheus监控Spark任务
Problem solving in magic tower project