当前位置:网站首页>Some descriptions of DS V2 push down in spark
Some descriptions of DS V2 push down in spark
2022-07-26 10:12:00 【Hongnai riverside bird】
background
This article is based on SPARK 3.3.0
Previous articles SPARK Medium FileSourceStrategy,DataSourceStrategy as well as DataSourceV2Strategy The rules We mentioned DS V2 push down The function of , Such as JDBC Complex push down , as well as Parquet Aggregation push down and so on . In fact, there is a big background – Namely TableCatalog class .
Conclusion
Say first conclusion , The premise of these aggregations is , stay spark The corresponding catalog, as follows :
spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
analysis
stay Rule V2ScanRelationPushDown In a series of rules , The first rule createScanBuilder:
private def createScanBuilder(plan: LogicalPlan) = plan.transform {
case r: DataSourceV2Relation =>
ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options))
}
Only yes DataSourceV2Relation type , That is to say DS v2, Will be converted to ScanBuilderHolder, And the subsequent pushDownFilters,pushDownAggregates Rules are based on ScanBuilderHolder To do the conversion ( If there is any ScanBuilderHolder Type will carry out DS v2 Unique rule transformation ), therefore DataSourceV2Relation Where it comes from is the key .
Go straight to the point :
stay RULE ResolveRelations Will carry out UnresolvedRelation To DataSourceV2Relation or UnresolvedCatalogRelation Transformation :
object ResolveRelations extends Rule[LogicalPlan] {
...
def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
lookupRelation(u).getOrElse(u)
case other => other
}
there lookupRelation According to whether there is a corresponding Catalog Registration to judge is DS V1 still DS V2:
private def lookupRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}).orElse {
val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec)
val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
}
case _ => None
}
}
}
...
private def expandIdentifier(nameParts: Seq[String]): Seq[String] = {
if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts
if (nameParts.length == 1) {
AnalysisContext.get.catalogAndNamespace :+ nameParts.head
} else if (catalogManager.isCatalogRegistered(nameParts.head)) {
nameParts
} else {
AnalysisContext.get.catalogAndNamespace.head +: nameParts
}
}
object CatalogAndIdentifier {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
if (nameParts.length == 1) {
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
} else {
try {
Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
} catch {
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts.asIdentifier))
}
}
}
}
expandIdentifier Method combination CatalogAndIdentifier.unapply Method , Judge :
- 1. If not specified catalog, be Default catalog by
v2SessionCatalog,catalog For the name of the"spark_catalog", This is also spark default sessionCatalog name , Jump to thestep 3
As below SQL:select a from table - 2. If you specify catalog, And catalog It's already registered ( Such as spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog), be catalog For the specified ( If
JDBCTableCatalog,catalog For the name of the"h2", Jump to thestep 3
As below SQL:select a from h2.table - 3. call
CatalogV2Util.loadTableMethod is the corresponding Catalog Of loadTable Method to get the corresponding Table:V2SessionCatalogcatalog Return yesV1TableJDBCTableCatalogcatalog The return isJDBCTable
This is the next step createRelation Methods will be based on different case Convert to different relation:
private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
table: Option[Table],
options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] = {
...
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) =>
if (isStreaming) {
if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
throw QueryCompilationErrors.permanentViewNotSupportedByStreamingReadingAPIError(
ident.quoted)
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
} else {
v1SessionCatalog.getRelation(v1Table.v1Table, options)
}
...
case table =>
...
} else {
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
- If it is V1Table, It will be converted to
UnresolvedCatalogRelation, And then in RuleFindDataSourceTableTransfer toLogicalRelation, This will involvelookupDataSource, That is, registereddatasource( Such as :“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider” perhaps "org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2”( At present, there is no cast matching )) It works ( stay providingInstance() Method implementation ) - If it's something else , It will be converted to
DataSourceV2Relation, And then in RuleV2ScanRelationPushDownDo a series of push down optimization
So about JDBC Of catalog Come on , Want to do DS V2 The optimization of the , You have to configure :
spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
If you want for others DS v2 The optimization of the , Such as Parquet, You have to realize the corresponding TableCatalog, Then configure it :
spark.sql.catalog.parquet=org.apache.spark.sql.execution.datasources.v2.jdbc.xxxx
About TableCatalog
at present jdbc Of datasource and TableCatalog stay spark It has been realized :
## datasource
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
## TableCatalog
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
If you want to achieve other datasource as well as catalog, You can refer to JDBC The implementation of the ( current JDBC Of source The implementation is still based on DS V1, It's better to be based on DS V2 Realization , As reference :ParquetDataSourceV2).
stay SPARK-28396 There are also discussions in this regard .
Further more DS V2 Push Down Characteristics of , Reference resources Technology frontier |Spark 3.3.0 in DS V2 Push-down Refactoring and new features
边栏推荐
- Formwork (III)
- 2022 zhongkepan cloud - server internal information acquisition and analysis flag
- Unstoppable, pure domestic PCs have been in place, and the monopoly of the U.S. software and hardware system has been officially broken
- Vs Code configures go locale and successfully installs go related plug-ins in vscode problem: Tools failed to install
- Sqoop [put it into practice 02] sqoop latest version full database import + data filtering + field type support description and example code (query parameter and field type forced conversion)
- AirTest
- protobuf的基本用法
- JS continuous assignment operation
- 点赞,《新程序员》电子书限时免费领啦!
- Solution of inputting whole line string after inputting integer
猜你喜欢

Node memory overflow and V8 garbage collection mechanism
![[MySQL database] a collection of basic MySQL operations - the basis of seeing (adding, deleting, modifying, and querying)](/img/a7/b3bb6f584dff0eb9b49e81e5b9dade.png)
[MySQL database] a collection of basic MySQL operations - the basis of seeing (adding, deleting, modifying, and querying)

点赞,《新程序员》电子书限时免费领啦!

2021 windows penetration of "Cyberspace Security" B module of Shandong secondary vocational group (analysis)

苹果独占鳌头,三星大举复兴,国产手机在高端市场颗粒无收

Production of a-modal drag function in antui

Mysql5.7.25 master-slave replication (one-way)

Principle analysis and source code interpretation of service discovery

Beginner of flask framework-04-flask blueprint and code separation

Apple dominates, Samsung revives, and domestic mobile phones fail in the high-end market
随机推荐
数通基础-TCPIP参考模型
Vs Code configures go locale and successfully installs go related plug-ins in vscode problem: Tools failed to install
在.NET 6.0中配置WebHostBuilder
[award-winning question] ask Judea pearl, the Turing prize winner and the father of Bayesian networks
Due to fierce competition in the new market, China Mobile was forced to launch a restrictive ultra-low price 5g package
C language course design Tetris (Part 1)
新增市场竞争激烈,中国移动被迫推出限制性超低价5G套餐
Interpretation of the standard of software programming level examination for teenagers_ second level
Flutter Event 派发
服务发现原理分析与源码解读
Flask框架初学-03-模板
Replay the snake game with C language (II) end
Server memory failure prediction can actually do this!
Xiaobai makes a wave of deep copy and shallow copy
PMM (percona monitoring and management) installation record
数通基础-网络基础知识
Formwork (III)
Force deduction DFS
Data communication foundation STP principle
AirTest