当前位置:网站首页>Catalog component design and custom extended catalog implementation in spark3
Catalog component design and custom extended catalog implementation in spark3
2022-07-27 15:38:00 【wankunde】
List of articles
Spark 3 in Catalog Component design
catalog Manage class inheritance
stay Spark 3 in , We can easily access the built-in Hive metastore Of catalog Extend the custom function . Let's first sort out the class design of the whole system , as well as catalog Initialization process of
Delta yes Databrick Provides an extended file storage format , It also provides related SQL Development interface . Let's see Delta In the project DeltaCatalog Class inheritance and implementation function relationship .
CatalogPlugin: Each individual namespaces You need to implement this interface , Default implementation V2SessionCatalog Of namespace = Array("default")
TableCatalog: Definition Table Interface for related operations , Include loadTable(), createTable(), alterTable(), dropTable() etc.
V2SessionCatalog: Spark Default catalog Realization , Through agency SessionCatalog ( That's what's down here externalCatalog ) object , Realize the actual operation of various tables .
CatalogExtension: This class also inherits TableCatalog, adopt setDelegateCatalog() Method put the above V2SessionCatalog Instance to agent .
DelegatingCatalogExtension: This class implements all the default methods of the above abstract class , Can pass `super.func(ident)` Call the default implementation , Then expand the logic we want to implement .
DeltaCatalog: Detailed implementation reference Delta Sample code provided
To sum up , If we want to implement custom Catalog, Inherit TableCatalog Class is OK ; If we want to realize the existing catalog Expand the function , Inheritance DelegatingCatalogExtension , This class will provide the default implementation of many methods , If we want to expand , Directly extend the corresponding logic .
Catalog Initialization process
Spark adopt CatalogManager You can manage multiple internal connections at the same time catalog, adopt spark.sql.catalog.${name} You can register multiple catalog,Spark default catalog from spark.sql.catalog.spark_catalog Parameter assignment , The usual way is , Customize catalog Class inheritance DelegatingCatalogExtension Realization , And then through spark.sql.catalog.spark_catalog Parameter to specify custom catalog class .
Take a closer look at HiveExternalCatalog, v2SessionCatalog, spark_catalog And other object instantiation and management processes
- SparkSession Enable HiveSupport
SparkSession.enableHiveSupport(), Parameters will be set in this methodCATALOG_IMPLEMENTATION = hive, Spark SQL Of catalog type (spark.sql.catalogImplementationParameters ) The default supporthiveandin-memoryTwo kinds of , If not specified , The default isin-memory. session.sharedState.externalCatalogyes Spark Session Actually responsible for interacting with external systems catalog, According to the parameters set above , They will be instantiatedHiveExternalCatalogandInMemoryCatalogTwo instances .- stay
BaseSessionStateBuilder/HiveSessionStateBuilderWill use the aboveexternalCatalogestablishcatalogobject , According tocatalogestablishv2SessionCatalogobject - according to
catalogandv2SessionCatalogestablish CatalogManager example . CatalogManager adoptcatalogs Map[catalog name, catalog instance]Object to manage multiple catalog.CatalogManager OfdefaultSessionCatalogThe attribute is the abovev2SessionCatalogobject . CatalogManager.catalog(name: String)adopt catalog Of name return catalog example , If there is no such instance , ThroughCatalogs.load(name: String, ...)Method to instantiate .Catalogs.load(name: String, ...)Method loading conf Configured inspark.sql.catalog.${name}class , And instantiate / initialization (initialize)CatalogPluginobject- There is a special catalog,
name = spark_catalog. Ifspark.sql.catalog.spark_catalogThe parameter is empty. ( The default is empty. ) when , returnCatalogManagerMediumdefaultSessionCatalogattribute . - If
spark.sql.catalog.spark_catalogParameters have been configured , Right up thereCatalogs.load()Judge from the examples , If it is found thatCatalogExtensionSubclass , Call it automaticallysetDelegateCatalog()Method , takecatalogManagerindefaultSessionCatalogSet as its internal proxy object .
Supplement relevant implementation code :
BaseSessionStateBuilder Instance initialization
protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
() => session.sharedState.externalCatalog,
() => session.sharedState.globalTempViewManager,
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)
protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)
HiveSessionStateBuilder extends BaseSessionStateBuilder
A subclass catalog The return is HiveSessionCatalog example
override protected lazy val catalog: HiveSessionCatalog = {
val catalog = new HiveSessionCatalog(
() => externalCatalog,
() => session.sharedState.globalTempViewManager,
new HiveMetastoreCatalog(session),
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
Test customization Catalog
Every catalog There is one. name, There's a group inside namespaces Array , By default catalog name = spark_catalog, namespace = Array(“default”). Let's test using a custom catalog.
Compile and package test classes
remarks : The test class cannot be in spark shell Write directly in the window ,REPL The bottom layer uses scala Built in scala.tools.nsc.interpreter The code compiled and loaded immediately by the tools under the package , But the corresponding class The document has been modified , The corresponding class nonparametric constructor does not , So there will be problems . The safer way is to compile the test class as Jar Retest
package org.apache.spark.wankun.catalog
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.util.CaseInsensitiveStringMap
// CatalogPlugin Subclasses must provide parameterless constructors , Can be in initialize Method
class DummyCatalog extends CatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
_name = name
}
private var _name: String = null
override def name(): String = _name
override def defaultNamespace(): Array[String] = Array("a", "b")
}
Switch catalog test
// spark-shell --master local --jars /Users/wankun/ws/wankun/sbtstudy/target/scala-2.12/sbtstudy_2.12-1.0.jar
scala> spark.sessionState.catalogManager.currentCatalog
res0: org.apache.spark.sql.connector.catalog.CatalogPlugin = V2SessionCatalog(spark_catalog)
scala> spark.sessionState.catalogManager.currentCatalog.name
res1: String = spark_catalog
scala> spark.sessionState.catalogManager.currentNamespace
res2: Array[String] = Array(default)
scala>
scala> import org.apache.spark.wankun.catalog.DummyCatalog
import org.apache.spark.wankun.catalog.DummyCatalog
scala> spark.sessionState.conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)
scala> spark.sessionState.catalogManager.currentCatalog
res4: org.apache.spark.sql.connector.catalog.CatalogPlugin = V2SessionCatalog(spark_catalog)
scala> spark.sessionState.catalogManager.setCurrentCatalog("dummy")
scala> spark.sessionState.catalogManager.currentCatalog
res6: org.apache.spark.sql.connector.catalog.CatalogPlugin = [email protected]
scala> spark.sessionState.catalogManager.currentCatalog.name
res7: String = dummy
scala> spark.sessionState.catalogManager.currentNamespace
res8: Array[String] = Array(a, b)
Customize JDBC and Kafka Data sources Catalog actual combat
The above basic analysis is clear spark sql Custom extensions in catalog Implementation principle of , stay spark sql In the process of parsing , adopt ResolveTables Rule It can help us automatically according to catalog( first namespace), Find and load the corresponding NonSessionCatalogAndIdentifier, The catalog According to the rest of our incoming namespace and name, Return to our custom Table; Finally through DataSourceV2Relation.create(table, Some(catalog), Some(ident)) return Table Packaged Relaiton object .
The rule code is as follows :
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.map {
rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
}.getOrElse(u)
...
}
/** * Performs the lookup of DataSourceV2 Tables from v2 catalog. */
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
case None => None
}
case _ => None
}
}
Finally, let's implement customization JDBC and Kafka Data sources Catalog, Detailed code reference : JDBC catalog Realization and Kafka Catalog Realization
Reference resources
边栏推荐
- 【云享读书会第13期】FFmpeg 查看媒体信息和处理音视频文件的常用方法
- Using Prometheus to monitor spark tasks
- Leetcode 191. number of 1 bits bit operation /easy
- How to take satisfactory photos / videos from hololens
- What format is this data returned from the background
- Analysis of spark task scheduling exceptions
- Spark 3.0 Adaptive Execution 代码实现及数据倾斜优化
- 【剑指offer】面试题49:丑数
- JS uses for in and for of to simplify ordinary for loops
- Spark 本地程序启动缓慢问题排查
猜你喜欢

What format is this data returned from the background

【剑指offer】面试题54:二叉搜索树的第k大节点

Singles cup, web:web check in

With just two modifications, apple gave styleganv2 3D generation capabilities

C语言:扫雷小游戏

Jump to the specified position when video continues playing
![[0 basic operations research] [super detail] column generation](/img/cd/f2521824c9ef6a50ec2be307c584ca.png)
[0 basic operations research] [super detail] column generation
![[daily question 1] 558. Intersection of quadtrees](/img/96/16ec3031161a2efdb4ac69b882a681.png)
[daily question 1] 558. Intersection of quadtrees

Complexity analysis

C语言:字符串函数与内存函数
随机推荐
Network equipment hard core technology insider router Chapter 3 Jia Baoyu sleepwalking in Taixu Fantasy (middle)
QT (five) meta object properties
聊聊ThreadLocal
[正则表达式] 匹配分组
What format is this data returned from the background
C语言中交换两数的方法
Cap theory and base theory
C语言:字符串函数与内存函数
【剑指offer】面试题56-Ⅰ:数组中数字出现的次数Ⅰ
实体类(VO,DO,DTO)的划分
“router-link”各种属性解释
Leetcode 90. subset II backtracking /medium
Network equipment hard core technology insider router Chapter 21 reconfigurable router
TCC
C语言:自定义类型
数组名是首元素地址吗?
js操作dom节点
Two stage submission and three stage submission
flutter —— 布局原理与约束
【剑指offer】面试题49:丑数