当前位置:网站首页>Catalog component design and custom extended catalog implementation in spark3
Catalog component design and custom extended catalog implementation in spark3
2022-07-27 15:38:00 【wankunde】
List of articles
Spark 3 in Catalog Component design
catalog Manage class inheritance
stay Spark 3 in , We can easily access the built-in Hive metastore Of catalog Extend the custom function . Let's first sort out the class design of the whole system , as well as catalog Initialization process of
Delta yes Databrick Provides an extended file storage format , It also provides related SQL Development interface . Let's see Delta In the project DeltaCatalog Class inheritance and implementation function relationship .
CatalogPlugin: Each individual namespaces You need to implement this interface , Default implementation V2SessionCatalog Of namespace = Array("default")
TableCatalog: Definition Table Interface for related operations , Include loadTable(), createTable(), alterTable(), dropTable() etc.
V2SessionCatalog: Spark Default catalog Realization , Through agency SessionCatalog ( That's what's down here externalCatalog ) object , Realize the actual operation of various tables .
CatalogExtension: This class also inherits TableCatalog, adopt setDelegateCatalog() Method put the above V2SessionCatalog Instance to agent .
DelegatingCatalogExtension: This class implements all the default methods of the above abstract class , Can pass `super.func(ident)` Call the default implementation , Then expand the logic we want to implement .
DeltaCatalog: Detailed implementation reference Delta Sample code provided
To sum up , If we want to implement custom Catalog, Inherit TableCatalog Class is OK ; If we want to realize the existing catalog Expand the function , Inheritance DelegatingCatalogExtension , This class will provide the default implementation of many methods , If we want to expand , Directly extend the corresponding logic .
Catalog Initialization process
Spark adopt CatalogManager You can manage multiple internal connections at the same time catalog, adopt spark.sql.catalog.${name} You can register multiple catalog,Spark default catalog from spark.sql.catalog.spark_catalog Parameter assignment , The usual way is , Customize catalog Class inheritance DelegatingCatalogExtension Realization , And then through spark.sql.catalog.spark_catalog Parameter to specify custom catalog class .
Take a closer look at HiveExternalCatalog, v2SessionCatalog, spark_catalog And other object instantiation and management processes
- SparkSession Enable HiveSupport
SparkSession.enableHiveSupport(), Parameters will be set in this methodCATALOG_IMPLEMENTATION = hive, Spark SQL Of catalog type (spark.sql.catalogImplementationParameters ) The default supporthiveandin-memoryTwo kinds of , If not specified , The default isin-memory. session.sharedState.externalCatalogyes Spark Session Actually responsible for interacting with external systems catalog, According to the parameters set above , They will be instantiatedHiveExternalCatalogandInMemoryCatalogTwo instances .- stay
BaseSessionStateBuilder/HiveSessionStateBuilderWill use the aboveexternalCatalogestablishcatalogobject , According tocatalogestablishv2SessionCatalogobject - according to
catalogandv2SessionCatalogestablish CatalogManager example . CatalogManager adoptcatalogs Map[catalog name, catalog instance]Object to manage multiple catalog.CatalogManager OfdefaultSessionCatalogThe attribute is the abovev2SessionCatalogobject . CatalogManager.catalog(name: String)adopt catalog Of name return catalog example , If there is no such instance , ThroughCatalogs.load(name: String, ...)Method to instantiate .Catalogs.load(name: String, ...)Method loading conf Configured inspark.sql.catalog.${name}class , And instantiate / initialization (initialize)CatalogPluginobject- There is a special catalog,
name = spark_catalog. Ifspark.sql.catalog.spark_catalogThe parameter is empty. ( The default is empty. ) when , returnCatalogManagerMediumdefaultSessionCatalogattribute . - If
spark.sql.catalog.spark_catalogParameters have been configured , Right up thereCatalogs.load()Judge from the examples , If it is found thatCatalogExtensionSubclass , Call it automaticallysetDelegateCatalog()Method , takecatalogManagerindefaultSessionCatalogSet as its internal proxy object .
Supplement relevant implementation code :
BaseSessionStateBuilder Instance initialization
protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
() => session.sharedState.externalCatalog,
() => session.sharedState.globalTempViewManager,
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)
protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)
HiveSessionStateBuilder extends BaseSessionStateBuilder
A subclass catalog The return is HiveSessionCatalog example
override protected lazy val catalog: HiveSessionCatalog = {
val catalog = new HiveSessionCatalog(
() => externalCatalog,
() => session.sharedState.globalTempViewManager,
new HiveMetastoreCatalog(session),
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
Test customization Catalog
Every catalog There is one. name, There's a group inside namespaces Array , By default catalog name = spark_catalog, namespace = Array(“default”). Let's test using a custom catalog.
Compile and package test classes
remarks : The test class cannot be in spark shell Write directly in the window ,REPL The bottom layer uses scala Built in scala.tools.nsc.interpreter The code compiled and loaded immediately by the tools under the package , But the corresponding class The document has been modified , The corresponding class nonparametric constructor does not , So there will be problems . The safer way is to compile the test class as Jar Retest
package org.apache.spark.wankun.catalog
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.util.CaseInsensitiveStringMap
// CatalogPlugin Subclasses must provide parameterless constructors , Can be in initialize Method
class DummyCatalog extends CatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
_name = name
}
private var _name: String = null
override def name(): String = _name
override def defaultNamespace(): Array[String] = Array("a", "b")
}
Switch catalog test
// spark-shell --master local --jars /Users/wankun/ws/wankun/sbtstudy/target/scala-2.12/sbtstudy_2.12-1.0.jar
scala> spark.sessionState.catalogManager.currentCatalog
res0: org.apache.spark.sql.connector.catalog.CatalogPlugin = V2SessionCatalog(spark_catalog)
scala> spark.sessionState.catalogManager.currentCatalog.name
res1: String = spark_catalog
scala> spark.sessionState.catalogManager.currentNamespace
res2: Array[String] = Array(default)
scala>
scala> import org.apache.spark.wankun.catalog.DummyCatalog
import org.apache.spark.wankun.catalog.DummyCatalog
scala> spark.sessionState.conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)
scala> spark.sessionState.catalogManager.currentCatalog
res4: org.apache.spark.sql.connector.catalog.CatalogPlugin = V2SessionCatalog(spark_catalog)
scala> spark.sessionState.catalogManager.setCurrentCatalog("dummy")
scala> spark.sessionState.catalogManager.currentCatalog
res6: org.apache.spark.sql.connector.catalog.CatalogPlugin = [email protected]
scala> spark.sessionState.catalogManager.currentCatalog.name
res7: String = dummy
scala> spark.sessionState.catalogManager.currentNamespace
res8: Array[String] = Array(a, b)
Customize JDBC and Kafka Data sources Catalog actual combat
The above basic analysis is clear spark sql Custom extensions in catalog Implementation principle of , stay spark sql In the process of parsing , adopt ResolveTables Rule It can help us automatically according to catalog( first namespace), Find and load the corresponding NonSessionCatalogAndIdentifier, The catalog According to the rest of our incoming namespace and name, Return to our custom Table; Finally through DataSourceV2Relation.create(table, Some(catalog), Some(ident)) return Table Packaged Relaiton object .
The rule code is as follows :
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.map {
rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
}.getOrElse(u)
...
}
/** * Performs the lookup of DataSourceV2 Tables from v2 catalog. */
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
case None => None
}
case _ => None
}
}
Finally, let's implement customization JDBC and Kafka Data sources Catalog, Detailed code reference : JDBC catalog Realization and Kafka Catalog Realization
Reference resources
边栏推荐
- [正则表达式] 匹配多个字符
- Jump to the specified position when video continues playing
- Network equipment hard core technology insider router chapter Cisco asr9900 disassembly (I)
- Method of removing top navigation bar in Huawei Hongmeng simulator
- md 中超链接的解析问题:解析`this.$set()`,`$`前要加空格或转义符 `\`
- Two stage submission and three stage submission
- 【剑指offer】面试题39:数组中出现次数超过一半的数字
- Go language learning notes (1)
- 2022-07-27 Daily: IJCAI 2022 outstanding papers were published, and 298 Chinese mainland authors won the first place in two items
- 使用解构交换两个变量的值
猜你喜欢

【剑指offer】面试题50:第一个只出现一次的字符——哈希表查找

NPM install error unable to access

HaoChen CAD building 2022 software installation package download and installation tutorial

Push down of spark filter operator on parquet file

Static关键字的三种用法

Leetcode 781. rabbit hash table in forest / mathematical problem medium

Leetcode 190. reverse binary bit operation /easy

Learn parquet file format

C语言:三子棋游戏

/dev/loop1占用100%问题
随机推荐
Network equipment hard core technology insider router 19 dpdk (IV)
C语言:自定义类型
npm install错误 unable to access
C语言:扫雷小游戏
Network equipment hard core technology insider router Chapter 6 tompkinson roaming the online world (middle)
Two stage submission and three stage submission
Multi table query_ Sub query overview and multi table query_ Sub query situation 1 & situation 2 & situation 3
聊聊ThreadLocal
聊聊面试必问的索引
Set the position of the prompt box to move with the mouse, and solve the problem of incomplete display of the prompt box
Static关键字的三种用法
How "Crazy" is Hefu Laomian, which is eager to be listed, with capital increasing frequently?
Learn parquet file format
Push down of spark filter operator on parquet file
Analysis of spark task scheduling exceptions
Spark Bucket Table Join
TCC
[系统编程] 进程,线程问题总结
【剑指offer】面试题51:数组中的逆序对——归并排序
Method of removing top navigation bar in Huawei Hongmeng simulator