当前位置:网站首页>Some descriptions of DS V2 push down in spark
Some descriptions of DS V2 push down in spark
2022-07-26 10:12:00 【Hongnai riverside bird】
background
This article is based on SPARK 3.3.0
Previous articles SPARK Medium FileSourceStrategy,DataSourceStrategy as well as DataSourceV2Strategy The rules We mentioned DS V2 push down The function of , Such as JDBC Complex push down , as well as Parquet Aggregation push down and so on . In fact, there is a big background – Namely TableCatalog class .
Conclusion
Say first conclusion , The premise of these aggregations is , stay spark The corresponding catalog, as follows :
spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
analysis
stay Rule V2ScanRelationPushDown In a series of rules , The first rule createScanBuilder:
private def createScanBuilder(plan: LogicalPlan) = plan.transform {
case r: DataSourceV2Relation =>
ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options))
}
Only yes DataSourceV2Relation type , That is to say DS v2, Will be converted to ScanBuilderHolder, And the subsequent pushDownFilters,pushDownAggregates Rules are based on ScanBuilderHolder To do the conversion ( If there is any ScanBuilderHolder Type will carry out DS v2 Unique rule transformation ), therefore DataSourceV2Relation Where it comes from is the key .
Go straight to the point :
stay RULE ResolveRelations Will carry out UnresolvedRelation To DataSourceV2Relation or UnresolvedCatalogRelation Transformation :
object ResolveRelations extends Rule[LogicalPlan] {
...
def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
lookupRelation(u).getOrElse(u)
case other => other
}
there lookupRelation According to whether there is a corresponding Catalog Registration to judge is DS V1 still DS V2:
private def lookupRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}).orElse {
val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec)
val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
}
case _ => None
}
}
}
...
private def expandIdentifier(nameParts: Seq[String]): Seq[String] = {
if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts
if (nameParts.length == 1) {
AnalysisContext.get.catalogAndNamespace :+ nameParts.head
} else if (catalogManager.isCatalogRegistered(nameParts.head)) {
nameParts
} else {
AnalysisContext.get.catalogAndNamespace.head +: nameParts
}
}
object CatalogAndIdentifier {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
if (nameParts.length == 1) {
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
} else {
try {
Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
} catch {
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts.asIdentifier))
}
}
}
}
expandIdentifier Method combination CatalogAndIdentifier.unapply Method , Judge :
- 1. If not specified catalog, be Default catalog by
v2SessionCatalog,catalog For the name of the"spark_catalog", This is also spark default sessionCatalog name , Jump to thestep 3
As below SQL:select a from table - 2. If you specify catalog, And catalog It's already registered ( Such as spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog), be catalog For the specified ( If
JDBCTableCatalog,catalog For the name of the"h2", Jump to thestep 3
As below SQL:select a from h2.table - 3. call
CatalogV2Util.loadTableMethod is the corresponding Catalog Of loadTable Method to get the corresponding Table:V2SessionCatalogcatalog Return yesV1TableJDBCTableCatalogcatalog The return isJDBCTable
This is the next step createRelation Methods will be based on different case Convert to different relation:
private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
table: Option[Table],
options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] = {
...
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) =>
if (isStreaming) {
if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
throw QueryCompilationErrors.permanentViewNotSupportedByStreamingReadingAPIError(
ident.quoted)
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
} else {
v1SessionCatalog.getRelation(v1Table.v1Table, options)
}
...
case table =>
...
} else {
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
- If it is V1Table, It will be converted to
UnresolvedCatalogRelation, And then in RuleFindDataSourceTableTransfer toLogicalRelation, This will involvelookupDataSource, That is, registereddatasource( Such as :“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider” perhaps "org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2”( At present, there is no cast matching )) It works ( stay providingInstance() Method implementation ) - If it's something else , It will be converted to
DataSourceV2Relation, And then in RuleV2ScanRelationPushDownDo a series of push down optimization
So about JDBC Of catalog Come on , Want to do DS V2 The optimization of the , You have to configure :
spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
If you want for others DS v2 The optimization of the , Such as Parquet, You have to realize the corresponding TableCatalog, Then configure it :
spark.sql.catalog.parquet=org.apache.spark.sql.execution.datasources.v2.jdbc.xxxx
About TableCatalog
at present jdbc Of datasource and TableCatalog stay spark It has been realized :
## datasource
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
## TableCatalog
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
If you want to achieve other datasource as well as catalog, You can refer to JDBC The implementation of the ( current JDBC Of source The implementation is still based on DS V1, It's better to be based on DS V2 Realization , As reference :ParquetDataSourceV2).
stay SPARK-28396 There are also discussions in this regard .
Further more DS V2 Push Down Characteristics of , Reference resources Technology frontier |Spark 3.3.0 in DS V2 Push-down Refactoring and new features
边栏推荐
- 汉诺塔II|汉诺塔4柱
- 挡不住了,纯国产PC已就位,美国的软硬件体系垄断正式被破
- Azkaban【基础知识 01】核心概念+特点+Web界面+架构+Job类型(一篇即可入门Azkaban工作流调度系统)
- 数通基础-网络基础知识
- PHP one-time request lifecycle
- Sqoop【环境搭建 01】CentOS Linux release 7.5 安装配置 sqoop-1.4.7 解决警告并验证(附Sqoop1+Sqoop2最新版安装包+MySQL驱动包资源)
- IEEE conference upload font problem
- Uniapp "no mobile phone or simulator detected, please try again later" and uniapp custom components and communication
- Wechat applet learning notes 2
- Flutter event distribution
猜你喜欢

Mqtt x cli officially released: powerful and easy-to-use mqtt 5.0 command line tool

2021年山东省中职组“网络空间安全”B模块windows渗透(解析)

Flask框架初学-03-模板

Use of tabbarcontroller

Okaleido ecological core equity Oka, all in fusion mining mode

Jpg to EPS

万字详解“用知识图谱驱动企业业绩增长”

Interview shock 68: why does TCP need three handshakes?

Unstoppable, pure domestic PCs have been in place, and the monopoly of the U.S. software and hardware system has been officially broken

Map key not configured and uniapp routing configuration and jump are reported by the uniapp < map >< /map > component
随机推荐
服务发现原理分析与源码解读
In Net 6.0
Production of a-modal drag function in antui
Learning notes: what are the common array APIs that change the original array or do not change the original array?
SQL Server 2008 R2 installation problems
Azkaban [basic knowledge 01] core concepts + features +web interface + Architecture +job type (you can get started with Azkaban workflow scheduling system in one article)
Principle analysis and source code interpretation of service discovery
【有奖提问】向图灵奖得主、贝叶斯网络之父 Judea Pearl 提问啦
万字详解“用知识图谱驱动企业业绩增长”
Okaleido生态核心权益OKA,尽在聚变Mining模式
Flask框架初学-04-flask蓝图及代码抽离
Spolicy request case
Map key not configured and uniapp routing configuration and jump are reported by the uniapp < map >< /map > component
Apple dominates, Samsung revives, and domestic mobile phones fail in the high-end market
Explain automatic packing and unpacking?
数通基础-TCPIP参考模型
Use of pclint in vs2013
服务发现原理分析与源码解读
Sublime install plug-ins
Necessary for beginners: debug breakpoint debugging skills in idea and common breakpoint skills