当前位置:网站首页>Catalog component design and custom extended catalog implementation in spark3
Catalog component design and custom extended catalog implementation in spark3
2022-07-27 15:38:00 【wankunde】
List of articles
Spark 3 in Catalog Component design
catalog Manage class inheritance
stay Spark 3 in , We can easily access the built-in Hive metastore Of catalog Extend the custom function . Let's first sort out the class design of the whole system , as well as catalog Initialization process of
Delta yes Databrick Provides an extended file storage format , It also provides related SQL Development interface . Let's see Delta In the project DeltaCatalog Class inheritance and implementation function relationship .
CatalogPlugin: Each individual namespaces You need to implement this interface , Default implementation V2SessionCatalog Of namespace = Array("default")
TableCatalog: Definition Table Interface for related operations , Include loadTable(), createTable(), alterTable(), dropTable() etc.
V2SessionCatalog: Spark Default catalog Realization , Through agency SessionCatalog ( That's what's down here externalCatalog ) object , Realize the actual operation of various tables .
CatalogExtension: This class also inherits TableCatalog, adopt setDelegateCatalog() Method put the above V2SessionCatalog Instance to agent .
DelegatingCatalogExtension: This class implements all the default methods of the above abstract class , Can pass `super.func(ident)` Call the default implementation , Then expand the logic we want to implement .
DeltaCatalog: Detailed implementation reference Delta Sample code provided
To sum up , If we want to implement custom Catalog, Inherit TableCatalog Class is OK ; If we want to realize the existing catalog Expand the function , Inheritance DelegatingCatalogExtension , This class will provide the default implementation of many methods , If we want to expand , Directly extend the corresponding logic .
Catalog Initialization process
Spark adopt CatalogManager You can manage multiple internal connections at the same time catalog, adopt spark.sql.catalog.${name} You can register multiple catalog,Spark default catalog from spark.sql.catalog.spark_catalog Parameter assignment , The usual way is , Customize catalog Class inheritance DelegatingCatalogExtension Realization , And then through spark.sql.catalog.spark_catalog Parameter to specify custom catalog class .
Take a closer look at HiveExternalCatalog, v2SessionCatalog, spark_catalog And other object instantiation and management processes
- SparkSession Enable HiveSupport
SparkSession.enableHiveSupport(), Parameters will be set in this methodCATALOG_IMPLEMENTATION = hive, Spark SQL Of catalog type (spark.sql.catalogImplementationParameters ) The default supporthiveandin-memoryTwo kinds of , If not specified , The default isin-memory. session.sharedState.externalCatalogyes Spark Session Actually responsible for interacting with external systems catalog, According to the parameters set above , They will be instantiatedHiveExternalCatalogandInMemoryCatalogTwo instances .- stay
BaseSessionStateBuilder/HiveSessionStateBuilderWill use the aboveexternalCatalogestablishcatalogobject , According tocatalogestablishv2SessionCatalogobject - according to
catalogandv2SessionCatalogestablish CatalogManager example . CatalogManager adoptcatalogs Map[catalog name, catalog instance]Object to manage multiple catalog.CatalogManager OfdefaultSessionCatalogThe attribute is the abovev2SessionCatalogobject . CatalogManager.catalog(name: String)adopt catalog Of name return catalog example , If there is no such instance , ThroughCatalogs.load(name: String, ...)Method to instantiate .Catalogs.load(name: String, ...)Method loading conf Configured inspark.sql.catalog.${name}class , And instantiate / initialization (initialize)CatalogPluginobject- There is a special catalog,
name = spark_catalog. Ifspark.sql.catalog.spark_catalogThe parameter is empty. ( The default is empty. ) when , returnCatalogManagerMediumdefaultSessionCatalogattribute . - If
spark.sql.catalog.spark_catalogParameters have been configured , Right up thereCatalogs.load()Judge from the examples , If it is found thatCatalogExtensionSubclass , Call it automaticallysetDelegateCatalog()Method , takecatalogManagerindefaultSessionCatalogSet as its internal proxy object .
Supplement relevant implementation code :
BaseSessionStateBuilder Instance initialization
protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
() => session.sharedState.externalCatalog,
() => session.sharedState.globalTempViewManager,
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)
protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)
HiveSessionStateBuilder extends BaseSessionStateBuilder
A subclass catalog The return is HiveSessionCatalog example
override protected lazy val catalog: HiveSessionCatalog = {
val catalog = new HiveSessionCatalog(
() => externalCatalog,
() => session.sharedState.globalTempViewManager,
new HiveMetastoreCatalog(session),
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
Test customization Catalog
Every catalog There is one. name, There's a group inside namespaces Array , By default catalog name = spark_catalog, namespace = Array(“default”). Let's test using a custom catalog.
Compile and package test classes
remarks : The test class cannot be in spark shell Write directly in the window ,REPL The bottom layer uses scala Built in scala.tools.nsc.interpreter The code compiled and loaded immediately by the tools under the package , But the corresponding class The document has been modified , The corresponding class nonparametric constructor does not , So there will be problems . The safer way is to compile the test class as Jar Retest
package org.apache.spark.wankun.catalog
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.util.CaseInsensitiveStringMap
// CatalogPlugin Subclasses must provide parameterless constructors , Can be in initialize Method
class DummyCatalog extends CatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
_name = name
}
private var _name: String = null
override def name(): String = _name
override def defaultNamespace(): Array[String] = Array("a", "b")
}
Switch catalog test
// spark-shell --master local --jars /Users/wankun/ws/wankun/sbtstudy/target/scala-2.12/sbtstudy_2.12-1.0.jar
scala> spark.sessionState.catalogManager.currentCatalog
res0: org.apache.spark.sql.connector.catalog.CatalogPlugin = V2SessionCatalog(spark_catalog)
scala> spark.sessionState.catalogManager.currentCatalog.name
res1: String = spark_catalog
scala> spark.sessionState.catalogManager.currentNamespace
res2: Array[String] = Array(default)
scala>
scala> import org.apache.spark.wankun.catalog.DummyCatalog
import org.apache.spark.wankun.catalog.DummyCatalog
scala> spark.sessionState.conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)
scala> spark.sessionState.catalogManager.currentCatalog
res4: org.apache.spark.sql.connector.catalog.CatalogPlugin = V2SessionCatalog(spark_catalog)
scala> spark.sessionState.catalogManager.setCurrentCatalog("dummy")
scala> spark.sessionState.catalogManager.currentCatalog
res6: org.apache.spark.sql.connector.catalog.CatalogPlugin = [email protected]
scala> spark.sessionState.catalogManager.currentCatalog.name
res7: String = dummy
scala> spark.sessionState.catalogManager.currentNamespace
res8: Array[String] = Array(a, b)
Customize JDBC and Kafka Data sources Catalog actual combat
The above basic analysis is clear spark sql Custom extensions in catalog Implementation principle of , stay spark sql In the process of parsing , adopt ResolveTables Rule It can help us automatically according to catalog( first namespace), Find and load the corresponding NonSessionCatalogAndIdentifier, The catalog According to the rest of our incoming namespace and name, Return to our custom Table; Finally through DataSourceV2Relation.create(table, Some(catalog), Some(ident)) return Table Packaged Relaiton object .
The rule code is as follows :
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.map {
rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
}.getOrElse(u)
...
}
/** * Performs the lookup of DataSourceV2 Tables from v2 catalog. */
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
case None => None
}
case _ => None
}
}
Finally, let's implement customization JDBC and Kafka Data sources Catalog, Detailed code reference : JDBC catalog Realization and Kafka Catalog Realization
Reference resources
边栏推荐
- Network equipment hard core technology insider router Chapter 10 Cisco asr9900 disassembly (III)
- Pytorch replaces some components in numpy. / / please indicate the source of the reprint
- 【云享读书会第13期】FFmpeg 查看媒体信息和处理音视频文件的常用方法
- C语言:函数栈帧
- Network equipment hard core technology insider router chapter Cisco asr9900 disassembly (I)
- Multi table query_ Exercise 1 & Exercise 2 & Exercise 3
- Deveco studio2.1 operation item error
- Some binary bit operations
- Spark 任务Task调度异常分析
- Adaptation verification new occupation is coming! Huayun data participated in the preparation of the national vocational skill standard for information system adaptation verifiers
猜你喜欢

C语言:字符串函数与内存函数

Push down of spark filter operator on parquet file

npm install错误 unable to access

Spark 3.0 DPP实现逻辑

学习Parquet文件格式

Alibaba's latest summary 2022 big factory interview real questions + comprehensive coverage of core knowledge points + detailed answers

Complexity analysis

语音直播系统——提升云存储安全性的必要手段

实现自定义Spark优化规则

Leetcode 74. search two-dimensional matrix bisection /medium
随机推荐
使用Prometheus监控Spark任务
Leetcode 240. search two-dimensional matrix II medium
JS uses unary operators to simplify string to number conversion
初识结构体
Spark 3.0 adaptive execution code implementation and data skew optimization
Leetcode 90. subset II backtracking /medium
Tools - common methods of markdown editor
leetcode-1:两数之和
Spark RPC
Network equipment hard core technology insider router Chapter 16 dpdk and its prequel (I)
初探JuiceFS
【剑指offer】面试题39:数组中出现次数超过一半的数字
Is it safe to open an account on a mobile phone?
Spark Filter算子在Parquet文件上的下推
Use double stars instead of math.pow()
Unity3d learning note 10 - texture array
Leetcode 456.132 mode monotone stack /medium
折半插入排序
[正则表达式] 匹配分组
C language: factorial recursive implementation of numbers