当前位置:网站首页>技术干货 | Linkis实践:新引擎实现流程解析
技术干货 | Linkis实践:新引擎实现流程解析
2022-06-09 14:35:00 【InfoQ】
目录导读
1. Linkis 新引擎功能代码实现
1.1 新建一个 maven 模块,并引入 ECP 的 maven 依赖

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-engineconn-plugin-core</artifactId>
<version>${linkis.version}</version>
</dependency>
<!-- 以及一些其他所需依赖的maven配置 -->1.2 实现 ECP 的主要接口
- EngineConnPlugin:启动 EngineConn 时,先找到对应的 EngineConnPlugin 类,以此为入口,获取其它核心接口的实现,是必须实现的主要接口。
- EngineConnFactory:实现如何启动一个引擎连接器,和如何启动一个引擎执行器的逻辑,是必须实现的接口。
- 实现 createEngineConn 方法:返回一个 EngineConn 对象,其中,getEngine 返回一个封装了与底层引擎连接信息的对象,同时包含 Engine 类型信息。
- 对于只支持单一计算场景的引擎,继承 SingleExecutorEngineConnFactory,实现 createExecutor,返回对应的 Executor。
- 对于支持多计算场景的引擎,需要继承 MultiExecutorEngineConnFactory,并为每种计算类型实现一个 ExecutorFactory。EngineConnPlugin 会通过反射获取所有的 ExecutorFactory,根据实际情况返回对应的 Executor。
- EngineConnResourceFactory:用于限定启动一个引擎所需要的资源,引擎启动前,将以此为依 据 向 Linkis Manager 申 请 资 源。非必须,默认可以使用 GenericEngineResourceFactory。
- EngineLaunchBuilder:用于封装 EngineConnManager 可以解析成启动命令的必要信息。非必须,可以直接继承 JavaProcessEngineConnLaunchBuilder。
1.3 实现引擎 Executor 执行器逻辑
- SensibleExecutor:
- Executor 存在多种状态,允许 Executor 切换状态
- Executor 切换状态后,允许做通知等操作
- YarnExecutor:指 Yarn 类型的引擎,能够获取得到 applicationId 和 applicationURL 和队列。
- ResourceExecutor:指引擎具备资源动态变化的能力,配合提供 requestExpectedResource 方法,用于每次希望更改资源时,先向 RM 申请新的资源;而 resourceUpdate 方法,用于每次引擎实际使用资源发生变化时,向 RM 汇报资源情况。
- AccessibleExecutor:是一个非常重要的 Executor 基类。如果用户的 Executor 继承了该基类,则表示该 Engine 是可以被访问的。这里需区分 SensibleExecutor 的 state()和 AccessibleExecutor 的 getEngineStatus()方法:state()用于获取引擎状态,getEngineStatus()会获取引擎的状态、负载、并发等基础指标 Metric 数据。
- 同时,如果继承了 AccessibleExecutor,会触发 Engine 进程实例化多个 EngineReceiver 方法。EngineReceiver 用于处理 Entrance、EM 和 LinkisMaster 的 RPC 请求,使得该引擎变成了一个可被访问的引擎,用户如果有特殊的 RPC 需求,可以通过实现 RPCService 接口,进而实现与 AccessibleExecutor 通信。
- ExecutableExecutor:是一个常驻型的 Executor 基类,常驻型的 Executor 包含:生产中心的 Streaming 应用、提交给 Schedulis 后指定要以独立模式运行的脚本、业务用户的业务应用等。
- StreamingExecutor:Streaming 为流式应用,继承自 ExecutableExecutor,需具备诊断、do checkpoint、采集作业信息、监控告警的能力。
- ComputationExecutor:是常用的交互式引擎 Executor,处理交互式执行任务,并且具备状态查询、任务 kill 等交互式能力。
1.4 引擎功能实现的实际案例
<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-computation-engineconn</artifactId>
<version>${linkis.version}</version>
</dependency>
- createEngineConn:创建了一个包含 UserGroupInformation、SessionState 和 HiveConf 的对象,作为与底层引擎的连接信息的封装,set 到 EngineConn 对象中返回。
- createExecutor:根据当前的引擎连接信息,创建一个 HiveEngineConnExecutor 执行器对象。
2. 以实际扩展引擎为例详解新引擎实现的后续步骤
2.1 引擎代码准备
JDBCEngineConnExecutorConcurrentComputationExecutorKyuubiJDBCEngineConnExecutorComputationExecutorobject AMConfiguration {
// 如果你的引擎是多用户并发引擎,那么这个配置项需要关注下
val MULTI_USER_ENGINE_TYPES = CommonVars("wds.linkis.multi.user.engine.types", "jdbc,ck,es,io_file,appconn")
private def getDefaultMultiEngineUser(): String = {
// 此处应该是为了设置并发引擎拉起时的启动用户,默认jvmUser即是引擎服务Java进程的启动用户
val jvmUser = Utils.getJvmUser
s"""{jdbc:"$jvmUser", presto: "$jvmUser", kyuubi: "$jvmUser", es: "$jvmUser", ck:"$jvmUser", appconn:"$jvmUser", io_file:"root"}"""
}
}2.2 新引擎类型扩展
ComputationSingleExecutorEngineConnFactoryKyuubiJDBCEngineConnFactoryoverride protected def getEngineConnType: EngineType = EngineType.KYUUBI
override protected def getRunType: RunType = RunType.KYUUBI// EngineType中类似已存在引擎的变量定义,增加Kyuubi相关变量或代码
object EngineType extends Enumeration with Logging {
val KYUUBI = Value("kyuubi")
}
def mapStringToEngineType(str: String): EngineType = str match {
case _ if KYUUBI.toString.equalsIgnoreCase(str) => KYUUBI
}
// RunType中
object RunType extends Enumeration {
val KYUUBI = Value("kyuubi")
}2.3 Kyuubi 引擎标签中的版本号设置
// 在LabelCommonConfig中增加kyuubi的version配置
public class LabelCommonConfig {
public final static CommonVars<String> KYUUBI_ENGINE_VERSION = CommonVars.apply("wds.linkis.kyuubi.engine.version", "1.4");
}
// 在EngineTypeLabelCreator的init方法中补充kyuubi的匹配逻辑
// 如果这一步不做,代码提交到引擎上时,引擎标签信息中会缺少版本号
public class EngineTypeLabelCreator {
private static void init() {
defaultVersion.put(EngineType.KYUUBI().toString(), LabelCommonConfig.KYUUBI_ENGINE_VERSION.getValue());
}
}2.4 允许 Scripts 中打开 kyuubi 类型的脚本文件
// FileSource.scala中
object FileSource {
private val fileType = Array("......", "kyuubi")
}2.5 配置 kyuubi 脚本变量存储和解析

// QLScriptCompaction.scala
class QLScriptCompaction private extends CommonScriptCompaction{
override def belongTo(suffix: String): Boolean = {
suffix match {
...
case "kyuubi" => true
case _ => false
}
}
}
// QLScriptParser.scala
class QLScriptParser private extends CommonScriptParser {
override def belongTo(suffix: String): Boolean = {
suffix match {
case "kyuubi" => true
case _ => false
}
}
}
// CustomVariableUtils.scala中
object CustomVariableUtils extends Logging {
def replaceCustomVar(jobRequest: JobRequest, runType: String): (Boolean, String) = {
runType match {
......
case "hql" | "sql" | "fql" | "jdbc" | "hive"| "psql" | "presto" | "ck" | "kyuubi" => codeType = SQL_TYPE
case _ => return (false, code)
}
}
}// 通过CodeAndRunTypeUtils工具类中的CODE_TYPE_AND_RUN_TYPE_RELATION变量来维护codeType和runType间的对应关系
val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars("wds.linkis.codeType.runType.relation", "sql=>sql|hql|jdbc|hive|psql|fql|kyuubi,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell")2.6 ujes.client 中增加新引擎类型
/ JobExecuteAction.scala中
object EngineType {
......
val KYUUBI = new EngineType {
override val toString: String = "kyuubi"
val KYUUBI_RunType = new RunType {
override val toString: String = "kyuubi"
}
override def getDefaultRunType: RunType = KYUUBI_RunType
}
}
// UJESClientUtils.scala中
object UJESClientUtils {
def toEngineType(engineType: String): EngineType = engineType match {
......
case "kyuubi" => EngineType.KYUUBI
case _ => EngineType.SPARK
}
def toRunType(runType:String, engineType: EngineType) : RunType = runType match {
......
case "kyuuubi" => EngineType.KYUUBI.KYUUBI_RunType
case _ => EngineType.SPARK.SQL
}
}2.7 Linkis 管理员台界面引擎管理器中加入新增引擎文字提示或图标
methods: {
calssifyName(params) {
switch (params) {
case 'kyuubi':
return 'Kyuubi';
......
}
}
// 图标过滤
supportIcon(item) {
const supportTypes = [
......
{ rule: 'kyuubi', logo: 'fi-kyuubi' },
];
}
}
2.8 引擎的编译打包和安装部署
cd /Users/leojie/intsig_project/intsiglinkis/linkis-engineconn-plugins/engineconn-plugins/kyuubi
mvn clean install -DskipTests<!--kyuubi-->
<fileSets>
......
<fileSet>
<directory>
../../linkis-engineconn-plugins/engineconn-plugins/kyuubi/target/out/
</directory>
<outputDirectory>lib/linkis-engineconn-plugins/</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
</fileSets>mvn clean install -DskipTests
2.9 引擎数据库配置
SET @KYUUBI_LABEL="kyuubi-1.4";
SET @KYUUBI_ALL=CONCAT('*-*,',@KYUUBI_LABEL);
SET @KYUUBI_IDE=CONCAT('*-IDE,',@KYUUBI_LABEL);
SET @KYUUBI_NODE=CONCAT('*-nodeexecution,',@KYUUBI_LABEL);
-- kyuubi
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'kyuubi引擎最大并发数', '2', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.connect.url', '例如:jdbc:hive2://127.0.0.1:10000', 'jdbc连接地址', '\"jdbc:hive2://127.0.0.1:10009/;[email protected]\"', 'None', '', '0', '0', '1', '数据源配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT', '[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '数据源配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.connect.max', '范围:1-20,单位:个', 'jdbc引擎最大连接数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '数据源配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.auth.type', '取值范围:SIMPLE,USERNAME,KERBEROS', 'jdbc认证方式', 'KERBEROS', 'OFT', '[\"SIMPLE\",\"USERNAME\",\"KERBEROS\"]', '0', '0', '1', '用户配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.username', 'username', '数据库连接用户名', '', 'None', '', '0', '0', '1', '用户配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.password', 'password', '数据库连接密码', '', 'None', '', '0', '0', '1', '用户配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.principal', '例如:hadoop/[email protected]', '用户principal', 'hadoop/[email protected]', 'None', '', '0', '0', '1', '用户配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.keytab.location', '例如:/data/keytab/hadoop.keytab', '用户keytab文件路径', '/data/keytab/hadoop.keytab', 'None', '', '0', '0', '1', '用户配置', 'kyuubi');
insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.kyuubi.jdbc.proxy.user.property', '例如:hive.server2.proxy.user', '用户代理配置', 'hive.server2.proxy.user', 'None', '', '0', '0', '1', '用户配置', 'kyuubi');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.cores', '取值范围:1-8,单位:个', 'kyuubi引擎初始化核心个数', '1', 'NumInterval', '[1,8]', '0', '0', '1', 'kyuubi引擎设置', 'kyuubi');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-8,单位:G', 'kyuubi引擎初始化内存大小', '1g', 'Regex', '^([1-8])(G|g)$', '0', '0', '1', 'kyuubi引擎设置', 'kyuubi');
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@KYUUBI_ALL, 'OPTIONAL', 2, now(), now());
insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'kyuubi' and label_value = @KYUUBI_ALL);
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@KYUUBI_IDE, 'OPTIONAL', 2, now(), now());
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@KYUUBI_NODE, 'OPTIONAL', 2, now(), now());
select @label_id := id from linkis_cg_manager_label where `label_value` = @KYUUBI_IDE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
select @label_id := id from linkis_cg_manager_label where `label_value` = @KYUUBI_NODE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
-- jdbc default configuration
insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`) (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @KYUUBI_ALL);-- 清除kyuubi引擎的初始化数据
SET @KYUUBI_LABEL="kyuubi-1.4";
SET @KYUUBI_ALL=CONCAT('*-*,',@KYUUBI_LABEL);
SET @KYUUBI_IDE=CONCAT('*-IDE,',@KYUUBI_LABEL);
SET @KYUUBI_NODE=CONCAT('*-nodeexecution,',@KYUUBI_LABEL);
delete from `linkis_ps_configuration_config_value` where `config_label_id` in
(select `relation`.`engine_type_label_id` AS `config_label_id` FROM `linkis_ps_configuration_key_engine_relation` relation INNER JOIN `linkis_cg_manager_label` label ON relation.engine_type_label_id = label.id AND label.label_value = @KYUUBI_ALL);
delete from `linkis_ps_configuration_key_engine_relation`
where `engine_type_label_id` in
(select label.id FROM `linkis_ps_configuration_config_key` config
INNER JOIN `linkis_cg_manager_label` label
ON config.engine_conn_type = 'kyuubi' and label_value = @KYUUBI_ALL);
delete from `linkis_ps_configuration_category`
where `label_id` in (select id from `linkis_cg_manager_label` where `label_value` in (@KYUUBI_IDE, @KYUUBI_NODE));
delete from `linkis_ps_configuration_config_key` where `engine_conn_type` = 'kyuubi';
delete from `linkis_cg_manager_label` where `label_value` in (@KYUUBI_ALL, @KYUUBI_IDE, @KYUUBI_NODE);
2.10 DSS Scripts 中新增脚本类型以及图标等信息
2.10.1 scriptis.js
{
rule: /\.kyuubi$/i,
// 语法高亮
lang: 'hql',
// 是否可执行
executable: true,
// application
application: 'kyuubi',
// 运行类型
runType: 'kyuubi',
// 脚本扩展名
ext: '.kyuubi',
// 脚本类型
scriptType: 'kyuubi',
abbr: 'kyuubi',
// 图标
logo: 'fi-kyuubi',
// 配色
color: '#FF6666',
// 是否可被新建
isCanBeNew: true,
label: 'Kyuubi',
// 是否可被打开
isCanBeOpen: true,
// 工作流类型
flowType: 'kyuubi'
},2.10.2 脚本复制支持
copyName() {
let typeArr = ['......', 'kyuubi']
}2.10.3 logo 与字体配色
data() {
return {
isHover: false,
iconColor: {
'fi-kyuubi': '#FF6666',
},
};
},let logoList = [
{ rule: /\.kyuubi$/i, logo: 'fi-kyuubi' },
];export const supportTypes = [
// 这里大概没用到
{ rule: /\.kyuubi$/i, logo: 'fi-kyuubi' },
]methods: {
calssifyName(params) {
switch (params) {
case 'kyuubi':
return 'Kyuubi';
......
}
}
// 图标过滤
supportIcon(item) {
const supportTypes = [
......
{ rule: 'kyuubi', logo: 'fi-kyuubi' },
];
}
}.fi-kyuubi:before {
content: "\e75e";
}
2.11 DSS 的工作流适配

# 引擎任务节点基本信息定义
insert into `dss_workflow_node` (`id`, `name`, `appconn_name`, `node_type`, `jump_url`, `support_jump`, `submit_to_scheduler`, `enable_copy`, `should_creation_before_node`, `icon`) values('18','kyuubi','-1','linkis.kyuubi.kyuubi',NULL,'1','1','1','0','svg文件');
# svg文件对应新引擎任务节点图标
# 引擎任务节点分类划分
insert into `dss_workflow_node_to_group`(`node_id`,`group_id`) values (18, 2);
# 引擎任务节点的基本信息(参数属性)绑定
INSERT INTO `dss_workflow_node_to_ui`(`workflow_node_id`,`ui_id`) VALUES (18,45);
# 在dss_workflow_node_ui表中定义了引擎任务节点相关的基本信息,然后以表单的形式在上图右侧中展示,你可以为新引擎扩充其他基础信息,然后自动被右侧表单渲染。import kyuubi from '../module/process/images/newIcon/kyuubi.svg';
const NODETYPE = {
......
KYUUBI: 'linkis.kyuubi.kyuubi',
}
const ext = {
......
[NODETYPE.KYUUBI]: 'kyuubi',
}
const NODEICON = {
[NODETYPE.KYUUBI]: {
icon: kyuubi,
class: {'kyuubi': true}
},
}3. 总结
边栏推荐
- 管理全局变量(二)
- Hongmeng porting i.mx6ull (11) storage device driver (based on imx6ull)
- 苏涛:对抗样本技术在互联网安全领域的应用
- 请教股票怎么开户 ,网上开户安全么?
- 【论文】Cascade RPN: Delving into high-quality region proposal network with Adaptive Convolution
- How greatsql is a popular open source database in China
- Excel上使用VBA的WebBrowser控件实现单点登录(SSO)
- 薪酬不变,每周只上四天班,英国试行全球最大规模“四天工作制”
- Why do SQL statements use indexes but still slow queries?
- 避免滥用class样式
猜你喜欢

Using kubekey to build kubernetes/kubesphere environment“

Hongmeng transplants the compiling system of i.mx6ull (VII) liteos-a
![[云原生]Kubernetes可视化界面WEBUI Kubernetes Dashboard](/img/25/9ec90f660b290823c36c1fe5a4c69e.jpg)
[云原生]Kubernetes可视化界面WEBUI Kubernetes Dashboard

Hongmeng transplantation i.mx6ull (V) overview of transplantation

PhD Debate | 自监督学习在推荐系统中的应用

混动大年,比亚迪的风评真要反转?

Why do SQL statements use indexes but still slow queries?

Meanings of 10 important concepts and charts in Data Science

鸿蒙移植i.mx6ull(六) Kconfig_GCC_Mkefile

验证回文串
随机推荐
Fcpx plug-in: motion blur FX by Ma
C listbox usage
[云原生]Kubernetes可视化界面WEBUI Kubernetes Dashboard
GreatSQL如何做中国广受欢迎的开源数据库
华为十大发明公布:高效加法网络、多目标博弈智能驾驶获奖
3 个技巧来破解你可以立即使用的 Flutter 生产力!
请教股票怎么开户 ,网上开户安全么?
道友,日增量百万级业务该如何进行数据设计呢?
【实战】Redis Cluster(下)-系统版本支持问题
使用 KubeKey 搭建 Kubernetes/KubeSphere 环境的“心路(累)历程“
一年进账34.46亿,又一个福建老乡要IPO了
期货外盘开户,还是内盘开户安全??
Hongmeng transplantation i.mx6ull (V) overview of transplantation
MySQL数据库的存储引擎
Explain the factory method
王兴张勇徐雷,谁能问鼎本地电商?
硬件基础——模拟电路
华为哈勃将再添IPO,美芯晟蛰伏十余年后冲刺科创板
Gaussdb (DWS) functions and supporting tools [Gauss is not a mathematician this time]
Leetcode 2001. 可互换矩形的组数(暴力枚举失败了)