当前位置:网站首页>技术干货 | Linkis1.0.2安装及使用指南
技术干货 | Linkis1.0.2安装及使用指南
2022-06-09 15:18:00 【InfoQ】
1. 背景
2.1 Linkis
2.1.1 核心特性
2.2 DataSphereStudio
- 数据开发IDE工具——Scriptis
- 数据可视化工具——Visualis
- 数据质量管理工具——Qualitis
- 工作流调度工具——Schedulis
- 数据交换工具——Exchangis
- 数据Api服务——DataApiService
- 流式应用开发管理工具——Streamis
3. 安装
链接:https://pan.baidu.com/s/17g05rtfE_JSt93Du9TXVug
提取码:zpep
计算层
├─Linkis引擎 #linkis引擎插件压缩包
│ flink_engine.zip
│ hive_engine.zip #支持tez
│ spark_engine.zip
└─本地集群 #本地集群配置及JAR包
│ flink_linkis.zip
│ hive_linkis.zip
│ spark_linkis.zip
└─udf #自定义函数测试JAR包
hive_udf.jar
flink_udf.jar3.1 涉及组件版本说明

3.2 依赖环境安装
3.3 安装包准备
// 首次拉取代码,需要执行以下命令,完成初始化
mvn -N install
// 执行打包命令
mvn clean install -Dmaven.test.skip=true3.4 安装
3.4.1 安装环境检查
3.4.1.1 硬件环境检查
SERVER_HEAP_SIZEvim ${LINKIS_HOME}/config/linkis-env.sh# java application default jvm memory.
export SERVER_HEAP_SIZE="128M"LINKIS-CG-ENGINECONN3.4.1.2 依赖环境检查
HADOOP_HOMEHADOOP_CONF_DIRhadoop fs -ls /HIVE_HOMEHIVE_CONF_DIRSPARK_HOMESPARK_CONF_DIRspark-submit --versionspark-sql./spark-sql --master yarn --deploy-mode clientFLINK_HOMEFLINK_CONF_DIRFLINK_LIB_DIRHadoopHiveSparkFlinksource /etc/profileexport JAVA_HOME=/opt/jdk1.8
export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib
export HADOOP_HOME=/opt/install/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HIVE_HOME=/opt/install/hive
export HIVE_CONF_DIR=$HIVE_HOME/conf
export FLINK_HOME=/opt/install/flink
export FLINK_CONF_DIR=/opt/install/flink/conf
export FLINK_LIB_DIR=/opt/install/flink/lib
export SPARK_HOME=/opt/install/spark
export SPARK_CONF_DIR=$SPARK_HOME/conf
export PATH=$MAVEN_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HOME/bin:$SQOOP_HOME/bin/:$FLINK_HOME/bin:$FLINKX_HOME/bin:$JAVA_HOME/bin:$PATH
export CLASSPATH=.$CLASSPATH:$JAVA_HOME/libsudo su - ${username}
echo ${JAVA_HOME}
echo ${FLINK_HOME}ONLY_FULL_GROUP_BYsql_modesql_mode1. 查看当前的sql_mode
select @@global.sql_mode;
2. 修改sql_mode
vim /etc/my.cnf
sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
3. 重启 Mysql 服务
service mysqld restart
service mysqld status1. 修改配置 vim /etc/my.cnf,增加以下配置
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
2. 重启 Mysql 服务
service mysqld restart
service mysqld status
3. 查看状态
show VARIABLES LIKE 'log_bin';
show global variables like "binlog%";3.4.1.3 安装用户检查
- 查看是否存在 hadoop 用户,命令为:
cat /etc/passwd | grep hadoop
httpfs:x:983:976:Hadoop HTTPFS:/var/lib/hadoop-httpfs:/bin/bash
mapred:x:982:975:Hadoop MapReduce:/var/lib/hadoop-mapreduce:/bin/bash
kms:x:979:972:Hadoop KMS:/var/lib/hadoop-kms:/bin/bash- 若不存在,则创建 hadoop 用户,并加入 hadoop 用户组,命令为:
sudo useradd hadoop -g hadoop
- 给 hadoop 用户授权 sudo 权限,命令为:
vi /etc/sudoers,在文件中添加hadoop ALL=(ALL) NOPASSWD: NOPASSWD: ALL内容,由于文件是只读的,使用wq!强制保存即可
- 修改安装用户的环境变量,
vim /home/hadoop/.bash_rc配置环境变量,环境变量如下:
export JAVA_HOME=/opt/jdk1.8
export HADOOP_HOME=/opt/install/hadoop
export HADOOP_CONF_DIR=/opt/install/hadoop/etc/hadoop
export HIVE_HOME=/opt/install/hive
export HIVE_CONF_DIR=/opt/install/hive/conf
export FLINK_HOME=/opt/install/flink
export FLINK_CONF_DIR=/opt/install/flink/conf
export FLINK_LIB_DIR=/opt/install/flink/lib
export SPARK_HOME=/opt/install/spark
export SPARK_CONF_DIR=/opt/install/spark/conf3.4.1.4 安装命令检查
vim bin/checkEnv.sh3.4.1.5 目录检查
ENGINECONN_ROOT_PATHHDFS_USER_ROOT_PATHENGINECONN_ROOT_PATHchmod -R 777 /目录HDFS_USER_ROOT_PATHhadoop fs -chmod -R 777 /目录3.4.2 解压安装包
unzip│ wedatasphere-dss-1.0.0-dist.tar.gz #dss后端安装包,使用一键install命令,会自动解压
│ wedatasphere-dss-web-1.0.0-dist.zip #web前端安装包,使用一键install命令,会自动解压
│ wedatasphere-linkis-1.0.2-combined-package-dist.tar.gz #linkis后端安装包,使用一键install命令,会自动解压
│
├─bin
│ checkEnv.sh #安装前命令检查脚本,不需要的命令,可以注释跳过检查
│ install.sh #一键安装命令,会完成解压、创建必须目录、导入元数据等操作
│ replace.sh #内部使用脚本,用于完成统一配置的覆盖
│ start-all.sh #一键启动所有微服务脚本,先启动linkis,再启动dss后端,再启动dss前端
│ stop-all.sh #一键停止所有微服务脚本
│
└─conf
config.sh #统一配置脚本,会通过replace.sh脚本,将配置分别覆盖到各组件的各个微服务中
db.sh #统一数据库配置脚本,包括linkis元数据库配置、hive元数据库配置3.4.3 修改配置
conf/db.shconf/config.sh# 查看所有端口号
netstat -ntlp
# 查看当前是否被占用
netstat -tunlp |grep 8080## for DSS-Server and Eventchecker APPJOINT
MYSQL_HOST=host
MYSQL_PORT=port
MYSQL_DB=db
MYSQL_USER=user
MYSQL_PASSWORD=password
##hive的配置
HIVE_HOST=host
HIVE_PORT=port
HIVE_DB=db
HIVE_USER=user
HIVE_PASSWORD=password### deploy user
deployUser=hadoop
### Linkis_VERSION
LINKIS_VERSION=1.0.2
### DSS Web
DSS_NGINX_IP=127.0.0.1
DSS_WEB_PORT=8088
### DSS VERSION
DSS_VERSION=1.0.0
############## ############## linkis的其他默认配置信息 start ############## ##############
### Generally local directory
WORKSPACE_USER_ROOT_PATH=file:///tmp/linkis/
### User's root hdfs path
HDFS_USER_ROOT_PATH=hdfs:///tmp/linkis
### Path to store job ResultSet:file or hdfs path
RESULT_SET_ROOT_PATH=hdfs:///tmp/linkis
### Path to store started engines and engine logs, must be local
ENGINECONN_ROOT_PATH=/appcom/tmp
### 引擎环境变量配置
HADOOP_CONF_DIR=/opt/install/hadoop/etc/hadoop
HIVE_CONF_DIR=/opt/install/hive/conf
SPARK_CONF_DIR=/opt/install/spark/conf
##YARN REST URL spark engine required
YARN_RESTFUL_URL=http://127.0.0.1:8088
### for install
LINKIS_PUBLIC_MODULE=lib/linkis-commons/public-module
## 微服务端口配置
### You can access it in your browser at the address below:http://${EUREKA_INSTALL_IP}:${EUREKA_PORT}
#LINKIS_EUREKA_INSTALL_IP=127.0.0.1 # Microservices Service Registration Discovery Center
LINKIS_EUREKA_PORT=20303
### Gateway install information
#LINKIS_GATEWAY_PORT =127.0.0.1
LINKIS_GATEWAY_PORT=8001
### ApplicationManager
#LINKIS_MANAGER_INSTALL_IP=127.0.0.1
LINKIS_MANAGER_PORT=8101
### EngineManager
#LINKIS_ENGINECONNMANAGER_INSTALL_IP=127.0.0.1
LINKIS_ENGINECONNMANAGER_PORT=8102
### EnginePluginServer
#LINKIS_ENGINECONN_PLUGIN_SERVER_INSTALL_IP=127.0.0.1
LINKIS_ENGINECONN_PLUGIN_SERVER_PORT=8103
### LinkisEntrance
#LINKIS_ENTRANCE_INSTALL_IP=127.0.0.1
LINKIS_ENTRANCE_PORT=8104
### publicservice
#LINKIS_PUBLICSERVICE_INSTALL_IP=127.0.0.1
LINKIS_PUBLICSERVICE_PORT=8105
### cs
#LINKIS_CS_INSTALL_IP=127.0.0.1
LINKIS_CS_PORT=8108
########## Linkis微服务配置完毕#####
################### The install Configuration of all DataSphereStudio's Micro-Services #####################
# 用于存储发布到 Schedulis 的临时ZIP包文件
WDS_SCHEDULER_PATH=file:///appcom/tmp/wds/scheduler
### This service is used to provide dss-framework-project-server capability.
#DSS_FRAMEWORK_PROJECT_SERVER_INSTALL_IP=127.0.0.1
DSS_FRAMEWORK_PROJECT_SERVER_PORT=9007
### This service is used to provide dss-framework-orchestrator-server capability.
#DSS_FRAMEWORK_ORCHESTRATOR_SERVER_INSTALL_IP=127.0.0.1
DSS_FRAMEWORK_ORCHESTRATOR_SERVER_PORT=9003
### This service is used to provide dss-apiservice-server capability.
#DSS_APISERVICE_SERVER_INSTALL_IP=127.0.0.1
DSS_APISERVICE_SERVER_PORT=9004
### This service is used to provide dss-workflow-server capability.
#DSS_WORKFLOW_SERVER_INSTALL_IP=127.0.0.1
DSS_WORKFLOW_SERVER_PORT=9005
### dss-flow-Execution-Entrance
### This service is used to provide flow execution capability.
#DSS_FLOW_EXECUTION_SERVER_INSTALL_IP=127.0.0.1
DSS_FLOW_EXECUTION_SERVER_PORT=9006
### This service is used to provide dss-datapipe-server capability.
#DSS_DATAPIPE_SERVER_INSTALL_IP=127.0.0.1
DSS_DATAPIPE_SERVER_PORT=9008
########## DSS微服务配置完毕#####
############## ############## other default configuration 其他默认配置信息 ############## ##############
## java application minimum jvm memory
export SERVER_HEAP_SIZE="128M"
##sendemail配置,只影响DSS工作流中发邮件功能
EMAIL_HOST=smtp.163.com
EMAIL_PORT=25
[email protected]
EMAIL_PASSWORD=xxxxx
EMAIL_PROTOCOL=smtp3.4.4 安装目录与配置检查
bin/install.sh├── linkis
│ ├── bin #主要存放linkis功能相关的命令,如客户端执行hive、spark任务等
│ │ ├── linkis-cli
│ │ ├── linkis-cli-hive
│ │ ├── linkis-cli-spark-sql
│ │ ├── linkis-cli-spark-submit
│ │ └── linkis-cli-start
│ ├── conf #linkis微服务的配置文件
│ │ ├── application-eureka.yml
│ │ ├── application-linkis.yml
│ │ ├── linkis-cg-engineconnmanager.properties
│ │ ├── linkis-cg-engineplugin.properties
│ │ ├── linkis-cg-entrance.properties
│ │ ├── linkis-cg-linkismanager.properties
│ │ ├── linkis-cli
│ │ │ ├── linkis-cli.properties
│ │ │ └── log4j2.xml
│ │ ├── linkis-env.sh
│ │ ├── linkis-mg-gateway.properties
│ │ ├── linkis.properties
│ │ ├── linkis-ps-cs.properties
│ │ ├── linkis-ps-publicservice.properties
│ │ ├── log4j2.xml
│ │ └── token.properties
│ ├── db #linkis元数据初始化的sql脚本
│ │ ├── linkis_ddl.sql
│ │ ├── linkis_dml.sql
│ ├── lib #linkis各个模块的依赖包
│ │ ├── linkis-commons
│ │ ├── linkis-computation-governance
│ │ │ ├── linkis-cg-engineconnmanager
│ │ │ ├── linkis-cg-engineplugin
│ │ │ ├── linkis-cg-entrance
│ │ │ ├── linkis-cg-linkismanager
│ │ │ └── linkis-client
│ │ │ └── linkis-cli
│ │ ├── linkis-engineconn-plugins
│ │ │ ├── appconn
│ │ │ ├── flink
│ │ │ ├── hive
│ │ │ ├── python
│ │ │ ├── shell
│ │ │ └── spark
│ │ ├── linkis-public-enhancements
│ │ │ ├── linkis-ps-cs
│ │ │ └── linkis-ps-publicservice
│ │ └── linkis-spring-cloud-services
│ │ ├── linkis-mg-eureka
│ │ └── linkis-mg-gateway
│ ├── LICENSE
│ ├── README_CN.md
│ ├── README.md
│ └── sbin #linkis启动脚本,用于启动各个微服务
│ ├── common.sh
│ ├── ext
│ │ ├── linkis-cg-engineconnmanager
│ │ ├── linkis-cg-engineplugin
│ │ ├── linkis-cg-entrance
│ │ ├── linkis-cg-linkismanager
│ │ ├── linkis-common-start
│ │ ├── linkis-mg-eureka
│ │ ├── linkis-mg-gateway
│ │ ├── linkis-ps-cs
│ │ └── linkis-ps-publicservice
│ ├── linkis-daemon.sh
│ ├── linkis-start-all.sh
│ └── linkis-stop-all.sh├── dss
│ ├── bin #dss安装脚本目录
│ │ ├── appconn-install.sh
│ │ ├── checkEnv.sh
│ │ ├── excecuteSQL.sh
│ │ └── install.sh
│ ├── conf #dss各个微服务配置目录
│ │ ├── application-dss.yml
│ │ ├── config.sh
│ │ ├── db.sh
│ │ ├── dss-apiservice-server.properties
│ │ ├── dss-datapipe-server.properties
│ │ ├── dss-flow-execution-server.properties
│ │ ├── dss-framework-orchestrator-server.properties
│ │ ├── dss-framework-project-server.properties
│ │ ├── dss.properties
│ │ ├── dss-workflow-server.properties
│ │ ├── log4j2.xml
│ │ ├── log4j.properties
│ │ └── token.properties
│ ├── dss-appconns #dss集成其它系统存放目录,如可视化、数据质量、调度等
│ ├── lib #dss各个微服务依赖包
│ ├── README.md
│ └── sbin #dss微服务启动脚本目录,支持一键启动、单个启动
│ ├── common.sh
│ ├── dss-daemon.sh
│ ├── dss-start-all.sh
│ ├── dss-stop-all.sh
│ └── ext
│ ├── dss-apiservice-server
│ ├── dss-datapipe-server
│ ├── dss-flow-execution-server
│ ├── dss-framework-orchestrator-server
│ ├── dss-framework-project-server
│ └── dss-workflow-server
├── web
│ ├── config.sh #web前端的配置脚本,如gateway地址等
│ ├── dist #dss前端静态文件
│ ├── dss #linkis前端静态文件(管理台是由linkis集成进来)
│ │ └── linkis
│ └── install.sh #安装启动脚本,安装、配置nginx1. dss中gateway地址配置错误,修改dss.properties配置文件,正确配置gateway地址
2. web中config.sh脚本中,gateway地址配置错误,需用户自行修改
3. linkis1.0.2中引擎目录会在创建引擎前完成自动授权,需要开启代理。修改linkis-cg-engineconnmanager.properties,添加wds.linkis.storage.enable.io.proxy=true3.4.5 启动服务
bin/start-all.shlinkis/sbin/linkis-start-all.shdss/sbin/dss-start-all.shweb/install.shweb/install.shLINKIS-CG-ENGINECONN// 1. linkis 微服务日志目录,默认启动的 8 个微服务的日志都在这里,具体可以对应查看每个微服务的日志
linkis/logs
// 2. linkis 引擎微服务的日志,需要参考 `ENGINECONN_ROOT_PATH` 获取引擎的根目录。一般情况下,若引擎未成功启动,需要关注 `linkis-cg-engineconnmanager` 日志;若启动成功,需要关注引擎日志;若引擎启动成功,任务执行失败,可以先查看引擎日志,若无具体信息,可以查看 YARN 日志,查看具体报错。
${ENGINECONN_ROOT_PATH}/hadoop/workDir/UUID/logs
// 3. dss 微服务日志,默认启动的 6 个微服务日志都在这里,具体可以对应查看每个微服务日志
dss/logs
// 4. 前端问题可以打开调试,查看具体请求,根据请求获取具体哪个微服务接口问题,再根据以上目录,查看具体微服务的日志

http://DSS_NGINX_IP:DSS_WEB_PORTlinkis-mg-gateway.properties- 登录页面,首页会展示主要功能面板及案例;

- Scriptis 面板是我们此次安装、测试的重点,用于编写 Hive、Spark、Flink等脚本及函数的管理;

- 管理台为 linkis 前台界面集成进来的,主要包括全局历史(脚本执行日志)、资源管理(引擎资源的使用情况,当有引擎启动的时候才会展示)、参数配置(yarn 资源队列、引擎资源配置等)、全局变量(全局变量配置)、ECM管理(ECM实例管理,也可对 ECM 下的引擎进行管理)、微服务管理(微服务管理面板)

3.4.6 功能测试
3.4.6.1 Hive
hive-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://host:9083</value>
</property>
<property>
<name>spark.master</name>
<value>yarn-cluster</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://host:3306/hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>MySQL5.7</value>
</property>
<property>
<name>hive.auto.convert.join</name>
<value>false</value>
<description>Enables the optimization about converting common join into mapjoin</description>
</property>
</configuration>show
tables;
select name, addr, id
from linkis_1
group by name, addr, id
order by id;
select a.name, a.addr, b.phone
from linkis_1 a
left join linkis_2 b on a.id = b.id
group by a.name, a.addr, b.phone
order by a.name;- 共 99 个测试案例,遵循 SQL99 和 SQL 2003 的语法标准,SQL 案例比较复杂
- 分析的数据量大,并且测试案例是在回答真实的商业问题
- 测试案例中包含各种业务模型(如分析报告型,迭代式的联机分析型,数据挖掘型等)
- 几乎所有的测试案例都有很高的 IO 负载和 CPU 计算需求
3.4.6.2 Spark
// 1. 确保spakr作业可以成功提交,测试命令如下:
./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--total-executor-cores 2 \
/opt/install/spark/examples/jars/spark-examples_2.11-2.4.3.jar \
100
// 2. 确保spark on hive,且为yarn模式可以成功执行。默认启动是本地模式,只要本地有hive的依赖就可以成功,yarn模式,需要将spark的jars目录下的JAR包都上传到hdfs上
./spark-sql --master yarn --deploy-mode client
// 可以执行以下sql进行测试
show tables;
select name,addr,id from linkis_1 group by name,addr,id order by id;
select a.name,a.addr,b.phone from linkis_1 a left join linkis_2 b on a.id=b.id group by a.name,a.addr,b.phone order by a.name;- spark-env.sh
#!/usr/bin/env bash
SPARK_CONF_DIR=/opt/install/spark/conf
HADOOP_CONF_DIR=/opt/install/hadoop/etc/hadoop
YARN_CONF_DIR=/opt/install/hadoop/etc/hadoop
SPARK_EXECUTOR_CORES=3
SPARK_EXECUTOR_MEMORY=4g
SPARK_DRIVER_MEMORY=2g- spark-defaults.conf
spark.yarn.historyServer.address=host:18080
spark.yarn.historyServer.allowTracking=true
spark.eventLog.dir=hdfs://host/spark/eventlogs
spark.eventLog.enabled=true
spark.history.fs.logDirectory=hdfs://host/spark/hisLogs
spark.yarn.jars=hdfs://host/spark-jars/*show
tables;
select name, addr, id
from linkis_1
group by name, addr, id
order by id;
select a.name, a.addr, b.phone
from linkis_1 a
left join linkis_2 b on a.id = b.id
group by a.name, a.addr, b.phone
order by a.name;val sql = "show tables"
val df = sqlContext.sql(sql)
df.show()3.4.6.3 UDF 函数
- i. 使用流程
1. 本地开发udf函数,完成打包。
2. 在dss的Scriptis界面上传JAR包。
3. 在dss界面创建函数,指定JAR包、函数名、函数的格式(补充主类)。
4. 选择是否加载。默认为加载,在引擎初始化的时候,会创建临时函数。新增、修改函数都需要重启引擎才能生效。

- ii. 加载流程
1. 在EngineConnServer中创建EngineConn,会有创建引擎前后的执行逻辑。
2. 执行afterExecutionExecute方法,由UDFLoadEngineConnHook获取所有需要加载的udf函数,查看udf注册格式,进行遍历注册。
3. 从加载流程来看,udf函数的生命周期就是引擎的生命周期,udf函数修改完成后,都要重启引擎才可以生效。
4. udf函数选择加载,会将JAR包放到引擎的classpath路径下,且在引擎创建的时候进行注册;不加载的话,那么classpath路径下便不会有此JAR包,也不会注册;且默认都是会话级别的函数。
5. 详细的加载流程可以通过UdfInfo关键字进行搜索,再查看具体逻辑。- iii. API 调用
UDFApiPOST http://gateway_ip:8001/api/rest_j/v1/udf/update
{"isShared":false,"udfInfo":{"id":4,"udfName":"testudf2","description":"7777","path":"file:///tmp/linkis/hadoop/udf/hive/hive_function.jar","shared":false,"useFormat":"testudf2()","load":true,"expire":false,"registerFormat":"create temporary function testudf2 as \" com.troila.hive.udf.MaskFromEnds\"","treeId":9,"udfType":0}}
3.4.6.4 Linkis 调试方式
./linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from default.ct_test;" -submitUser hadoop -proxyUser hadoop
./linkis-cli -engineType hive-2.3.3 -codeType sql -code "select count(*) from default.ct_test;" -submitUser hadoop -proxyUser hadoop
./linkis-cli -engineType hive-2.3.3 -codeType sql -code "select * from \${table};" -varMap table=default.ct_test -submitUser hadoop -proxyUser hadoop- 引入依赖
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-computation-client</artifactId>
<version>${linkis.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-beanutils</artifactId>
<groupId>commons-beanutils</groupId>
</exclusion>
</exclusions>
</dependency>- Scala 代码示例
package com.troila.bench.linkis.spark
import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant
import com.webank.wedatasphere.linkis.ujes.client.UJESClient
import com.webank.wedatasphere.linkis.ujes.client.request.{JobSubmitAction, ResultSetAction}
import org.apache.commons.io.IOUtils
import java.util
import java.util.concurrent.TimeUnit
object ScalaClientTest {
def main(args: Array[String]): Unit = {
val user = "hadoop"
val username = "hadoop"
val password = "hadoop"
val yarnQueue = "default"
val executeCode = "select name,addr,id from linkis_1 group by name,addr,id order by id"
val gatewayUrl = "http://gateway_ip:8001"
// 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
val clientConfig = DWSClientConfigBuilder.newBuilder()
.addServerUrl(gatewayUrl) //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
.connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
.loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
.maxConnectionSize(5) //指定最大连接数,即最大并发数
.retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
.setAuthTokenKey(username).setAuthTokenValue(password) //认证key,一般为用户名; 认证value,一般为用户名对应的密码
.setDWSVersion("v1").build() //Linkis后台协议的版本,当前版本为v1
// 2. 通过DWSClientConfig获取一个UJESClient
val client = UJESClient(clientConfig)
try {
// 3. 开始执行代码
println("user : " + user + ", code : [" + executeCode + "]")
val startupMap = new java.util.HashMap[String, Any]()
startupMap.put("wds.linkis.yarnqueue", yarnQueue) //启动参数配置
//指定Label
val labels: util.Map[String, Any] = new util.HashMap[String, Any]
//添加本次执行所依赖的的标签,如engineLabel
labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3")
labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE")
labels.put(LabelKeyConstant.CODE_TYPE_KEY, "sql")
//指定source
val source: util.Map[String, Any] = new util.HashMap[String, Any]
// 参数替换
val varMap: util.Map[String, Any] = new util.HashMap[String, Any]
// varMap.put("table", "linkis_1")
val jobExecuteResult = client.submit(JobSubmitAction.builder
.addExecuteCode(executeCode)
.setStartupParams(startupMap)
.setUser(user) //Job提交用户
.addExecuteUser(user) //实际执行用户
.setLabels(labels)
.setSource(source)
.setVariableMap(varMap)
.build) //User,请求用户;用于做用户级多租户隔离
println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
// 4. 获取脚本的执行状态
var jobInfoResult = client.getJobInfo(jobExecuteResult)
val sleepTimeMills: Int = 1000
while (!jobInfoResult.isCompleted) {
// 5. 获取脚本的执行进度
val progress = client.progress(jobExecuteResult)
val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
println("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)
Utils.sleepQuietly(sleepTimeMills)
jobInfoResult = client.getJobInfo(jobExecuteResult)
}
if (!jobInfoResult.isSucceed) {
println("Failed to execute job: " + jobInfoResult.getMessage)
throw new Exception(jobInfoResult.getMessage)
}
// 6. 获取脚本的Job信息
val jobInfo = client.getJobInfo(jobExecuteResult)
// 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
val resultSetList = jobInfoResult.getResultSetList(client)
println("All result set list:")
resultSetList.foreach(println)
val oneResultSet = jobInfo.getResultSetList(client).head
// 8. 通过一个结果集信息,获取具体的结果集
val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
println("First fileContents: ")
println(fileContents)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
IOUtils.closeQuietly(client)
}
}
3.5 扩展功能
3.5.1 Hive 支持 TEZ 引擎
3.5.1.1 Linkis 操作
tez-*engineConnPublickDirlinkis/lib/linkis-engineconn-plugins/hive/dist/v2.3.7/lib// linkis/lib/linkis-engineconn-plugins/hive/dist/v2.3.7/lib
// 此目录下,在引擎第一次启动的时候,会生成一个lib.zip的缓存包,若修改了lib下的JAR包,而此压缩包没有更新的话,那么还是无法使用最新的JAR包
tez-api-0.9.2.jar
tez-build-tools-0.9.2.jar
tez-common-0.9.2.jar
tez-dag-0.9.2.jar
tez-examples-0.9.2.jar
tez-ext-service-tests-0.9.2.jar
tez-history-parser-0.9.2.jar
tez-javadoc-tools-0.9.2.jar
tez-job-analyzer-0.9.2.jar
tez-mapreduce-0.9.2.jar
tez-protobuf-history-plugin-0.9.2.jar
tez-runtime-internals-0.9.2.jar
tez-runtime-library-0.9.2.jar
tez-tests-0.9.2.jar
tez-yarn-timeline-history-0.9.2.jar
tez-yarn-timeline-history-with-acls-0.9.2.jar
hadoop-yarn-registry-2.8.5.jar3.5.1.2 本地集群配置
- 准备 TEZ 依赖包,上传到 HDFS 上,并完成授权。
# tez官方文档指出此路径可以是压缩包,也可以是解压之后的JAR文件。经测试,建议直接上传解压后的JAR文件。
hdfs dfs -mkidr /tez_linkis
# tez目录下为编译完tez的完整JAR包
hdfs dfs -put tez /tez_linkis
# 完成授权,确保linkis提交用户可以读取tez文件
hadoop fs -chmod -R 755 /tez_linkis- 修改
hive-site.xml,切换引擎,且配置 container 模式
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://host:9083</value>
</property>
<property>
<name>spark.master</name>
<value>yarn-cluster</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://host:3306/hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>MySQL5.7</value>
</property>
<property>
<name>hive.execution.engine</name>
<value>tez</value>
<description>修改hive的执行引擎为tez</description>
</property>
<!-- container -->
<property>
<name>hive.execution.mode</name>
<value>container</value>
</property>
</configuration>
- 修改
${hadoop_conf_dir}/etc/hadoop/tez-site.xml,配置 TEZ 依赖
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>tez.lib.uris</name>
<value>${fs.defaultFS}/tez_linkis/tez</value>
</property>
<!-- tez.lib.uris.classpath配置主要为设置自定义的udf等一些扩展的依赖包位置,可以不指定 -->
<property>
<name>tez.lib.uris.classpath</name>
<value>${fs.defaultFS}/tez_linkis/tez</value>
</property>
<property>
<name>tez.use.cluster.hadoop-libs</name>
<value>true</value>
</property>
<property>
<name>tez.history.logging.service.class</name>
<value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value>
</property>
</configuration>No LLAP Daemons are running- 参考 container 模式,完成 TEZ 的依赖上传与配置操作
- 修改
hive-site.xml,切换引擎,且配置 llap 模式
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://host:9083</value>
</property>
<property>
<name>spark.master</name>
<value>yarn-cluster</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://host:3306/hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>MySQL5.7</value>
</property>
<property>
<name>hive.execution.engine</name>
<value>tez</value>
<description>修改hive的执行引擎为tez</description>
</property>
<!-- llap -->
<property>
<name>hive.execution.mode</name>
<value>llap</value>
</property>
<property>
<name>hive.llap.execution.mode</name>
<value>all</value>
</property>
<property>
<name>hive.llap.daemon.service.hosts</name>
<value>@llap_service</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>ct4:2181,ct5:2181,ct6:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.llap.daemon.num.executors</name>
<value>1</value>
</property>
</configuration>- 部署 llap 服务
1. 安装slider,配置环境变量SLIDER_HOME、PATH等。
2. 执行hive命令,生成llap的启动包,需要保证此处的服务名和hist-site中配置的名字一致。
hive --service llap --name llap_service --instances 2 --cache 512m --executors 2 --iothreads 2 --size 1024m --xmx 512m --loglevel INFO --javaHome /opt/jdk1.8
3. 由于linkis使用hadoop用户提交任务,为了保证tez的应用可以获取到llap的进程,需要切换到hadoop用户去启动llap服务。如果linkis使用别的用户提交作业,llap也要用相同的用户启动,linkis可以指定,dss控制台默认使用的是hadoop用户。
su hadoop;./llap-slider-31Aug2021/run.sh
4. 验证服务可用,yarn的页面上成功提交llap_service的应用,且User为hadoop,再在服务器上使用 jps 命令查看进程,出现LlapDaemon即表明成功。
5. 此服务只要提交用户相同就可用被其它应用获取,所以只需要在hive的一个节点上启动此服务即可,其它hive节点不需要安装slider、llap-slider启动包等。3.5.1.3 Linkis 脚本测试
No LLAP Daemons are running// userCreator可以指定为hadoop-IDE,那么user就是hadoop。
POST http://gateway_ip:8001/api/rest_j/v1/entrance/submit
{
"executionContent": {"code": "select name,addr,id from linkis_1 group by name,addr,id order by id", "runType": "sql"},
"params": {"variable": {}, "configuration": {}},
"source": {"scriptPath": ""},
"labels": {
"engineType": "hive-2.3.7",
"userCreator": "root-IDE"
}
}3.5.2 Flink 引擎支持
${flink_lib_dir}- 如果是
Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.这种错误,一般是 Linkis 引擎插件目录下没有相应的 Connector 包,因为引擎插件目录下的包,会在启动的时候放到 classpath 上。
- 如果是
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat这种错误,明明classpath 上已经存在这个包,且包中含有此类,一般是由于 Flink 的 lin 目录下没有这个包。
- 另外对于一些有 sql-connector 的连接器包,优先使用此包,这个包会引入 Connector 包,所以直接用 sql-connector 包即可。
- 对于某些特殊数据格式的,需要自行编译 Flink format 放到 lib 目录和 Linkis 引擎目录下,目前支持 CSV、JSON 格式,对于 Debezium、Maxwell、Canal 等需要自行编译。
3.5.2.1 本地安装 Flink
- flink-conf.yaml
vim flink-conf.yamljobmanager.archive.fs.dir: hdfs://ct4:8020/flink-test
env.java.home: /opt/jdk1.8
classloader.resolve-order: parent-first
parallelism.default: 1
taskmanager.numberOfTaskSlots: 1
historyserver.archive.fs.dir: hdfs://ct4:8020/flink
jobmanager.rpc.port: 6123
taskmanager.heap.size: 1024m
jobmanager.heap.size: 1024m
rest.address: ct6- 配置环境变量
- 编译流程:
1. 先格式化代码
mvn spotless:apply
2. 打包编译
mvn clean install -Dmaven.test.skip=true3.5.2.2 新增 Flink 引擎插件
1. 在linkis项目中手动编译flink插件,编译完成之后拷贝并上传flink-engineconn.zip
mvn clean install -Dmaven.test.skip=true
2. 解压压缩文件 flink-engineconn.zip 到 `${LINKIS_HOME}/lib/linkis-engineconn-plugins` 目录下
unzip flink-engineconn.zip
3. 上传所需的connector包和数据格式转换包,共有两个目录需要上传,以下是目录示例:
${LINKIS_HOME}/lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib
${FLINK_HOME}/lib
4. 刷新引擎。通过restful接口热加载引擎,请求 `LINKIS-CG-ENGINEPLUGIN` 服务,可以在配置文件中获取此服务的端口号。
POST http://LINKIS-CG-ENGINEPLUGIN_IP:LINKIS-CG-ENGINEPLUGIN_PORT/api/rest_j/v1/rpc/receiveAndReply
{
"method": "/enginePlugin/engineConn/refreshAll"
}
5. 可选操作,新增引擎的参数需要动态管理,可以添加引擎参数到linkis的元数据库中,这样在 管理台-->参数配置 可以可视化的修改引擎启动的参数。可以参考初始化的sql语句和flink插件的配置进行插入操作。- 基本测试
SELECT 'linkis flink engine test!!!';
SELECT name, COUNT(*) AS cnt
FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
GROUP BY name;3.5.2.3 Flink Connector 调试
链接:https://pan.baidu.com/s/17g05rtfE_JSt93Du9TXVug
提取码:zpep- 根据上文的编译方法,编译
flink-sql-connector-kafka_2.11-1.12.2.jar 包,并上传到上文提到的两个目录
- 测试脚本
CREATE TABLE source_kafka
(
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'flink_sql_1',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'ct4:9092,ct5:9092,ct6:9092',
'format' = 'json'
);
CREATE TABLE sink_kafka
(
id STRING,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'flink_sql_3',
'properties.bootstrap.servers' = 'ct4:9092,ct5:9092,ct6:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO sink_kafka
SELECT `id`,
`name`,
`age`
FROM source_kafka;- 上传
flink-connector-jdbc_2.11-1.12.2.jar 和 mysql-connector-java-5.1.49.jar 到以上两个目录
- 测试脚本
CREATE TABLE source_mysql
(
id STRING,
name STRING,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://host:3306/computation',
'table-name' = 'flink_sql_1',
'username' = 'root',
'password' = 'MySQL5.7'
);
CREATE TABLE sink_kafka
(
id STRING,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'flink_sql_3',
'properties.bootstrap.servers' = 'ct4:9092,ct5:9092,ct6:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO sink_kafka
SELECT `id`,
`name`,
`age`
FROM source_mysql;- 上传
flink-connector-mysql-cdc-1.2.0.jar 和 mysql-connector-java-5.1.49.jar 到以上两个目录
- 测试脚本
CREATE TABLE mysql_binlog
(
id STRING NOT NULL,
name STRING,
age INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'host',
'port' = '3306',
'username' = 'root',
'password' = 'MySQL5.7',
'database-name' = 'flink_sql_db',
'table-name' = 'flink_sql_2',
'debezium.snapshot.locking.mode' = 'none' --建议添加,不然会要求锁表
);
CREATE TABLE sink_kafka
(
id STRING,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'flink_sql_3',
'properties.bootstrap.servers' = 'ct4:9092,ct5:9092,ct6:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO sink_kafka
SELECT `id`,
`name`,
`age`
FROM mysql_binlog;flink-sql-connector-elasticsearch7_2.11- 上传
flink-sql-connector-elasticsearch7_2.11-1.12.2.jar 到以上两个目录
- 测试脚本
CREATE TABLE mysql_binlog
(
id STRING NOT NULL,
name STRING,
age INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'host',
'port' = '3306',
'username' = 'root',
'password' = 'MySQL5.7',
'database-name' = 'flink_sql_db',
'table-name' = 'flink_sql_2',
'debezium.snapshot.locking.mode' = 'none' --建议添加,不然会要求锁表
);
CREATE TABLE sink_es
(
id STRING,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://host:9200',
'index' = 'flink_sql_cdc'
);
INSERT INTO sink_es
SELECT `id`,
`name`,
`age`
FROM mysql_binlog;3.5.2.4 自定义开发 Connector
1. 新增单机版 Redis 的连接配置与处理逻辑
2. 删除了代码中启用的代码,使用新版本的 `DynamicTableSink`、`DynamicTableSinkFactory` 来实现动态 Sink 处理- 上传
flink-connector-redis_2.11.jar 到以上两个目录
- 测试脚本
CREATE TABLE datagen
(
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '10'
);
CREATE TABLE redis
(
name STRING,
id INT
) WITH (
'connector' = 'redis',
'redis.mode' = 'single',
'command' = 'SETEX',
'single.host' = '172.0.0.1',
'single.port' = '6379',
'single.db' = '0',
'key.ttl' = '60',
'single.password' = 'password'
);
insert into redis
select name, id
from datagen;- 上传
flink-connector-mongodb_2.11.jar 到以上两个目录
- 测试脚本
CREATE TABLE datagen
(
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '10'
);
CREATE TABLE mongoddb
(
id INT,
name STRING
) WITH (
'connector' = 'mongodb',
'database' = 'mongoDBTest',
'collection' = 'flink_test',
'uri' = 'mongodb://user:[email protected]:27017/?authSource=mongoDBTest',
'maxConnectionIdleTime' = '20000',
'batchSize' = '1'
);
insert into mongoddb
select id, name
from datagen;3.5.2.5 提交 Flink 作业
- 引入
linkis-computation-client pom 依赖
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-computation-client</artifactId>
<version>${linkis.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-beanutils</artifactId>
<groupId>commons-beanutils</groupId>
</exclusion>
</exclusions>
</dependency>
resources 下配置 linkis.properties 指定 gateway 地址
wds.linkis.server.version=v1
wds.linkis.gateway.url=http://host:8001/
- 代码示例
import com.webank.wedatasphere.linkis.common.conf.Configuration
import com.webank.wedatasphere.linkis.computation.client.once.simple.SimpleOnceJob
import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils
import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant
/**
* Created on 2021/8/24.
*
* @author MariaCarrie
*/
object OnceJobTest {
def main(args: Array[String]): Unit = {
val sql =
"""CREATE TABLE source_from_kafka_8 (
| id STRING,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'flink_sql_1',
| 'scan.startup.mode' = 'earliest-offset',
| 'properties.bootstrap.servers' = 'ct4:9092,ct5:9092,ct6:9092',
| 'format' = 'json'
|);
|CREATE TABLE sink_es_table1 (
| id STRING,
| name STRING,
| age INT,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'elasticsearch-7',
| 'hosts' = 'http://host:9200',
| 'index' = 'flink_sql_8'
|);
|INSERT INTO
| sink_es_table1
|SELECT
| `id`,
| `name`,
| `age`
|FROM
| source_from_kafka_8;
|""".stripMargin
val onceJob = SimpleOnceJob.builder().setCreateService("Flink-Test").addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "flink-1.12.2")
.addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "hadoop-Streamis").addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
.addStartupParam(Configuration.IS_TEST_MODE.key, true)
.addStartupParam("flink.taskmanager.numberOfTaskSlots", 4)
.addStartupParam("flink.container.num", 4)
.addStartupParam("wds.linkis.engineconn.flink.app.parallelism", 8)
.addStartupParam(Configuration.IS_TEST_MODE.key, true)
.setMaxSubmitTime(300000)
.addExecuteUser("hadoop").addJobContent("runType", "sql").addJobContent("code", sql).addSource("jobName", "OnceJobTest")
.build()
onceJob.submit()
onceJob.waitForCompleted()
System.exit(0)
}
}
4. 最佳实践
4.1 Hive
4.1.1 权限不足导致引擎启动失败
- 问题描述
- 详细报错
Error: Could not find or load main class com.webank.wedatasphere.linkis.engineconn.launch.EngineConnServer
Caused by: LinkisException{errCode=10010, desc='DataWorkCloud service must set the version, please add property [[wds.linkis.server.version]] to properties file.', ip='null', port=0, serviceKind='null'}
- 解决方案
engineConnPublickDirengineConnExec.shengineConnPublickDirengineConnPublickDirhandleInitEngineConnResourceslinkis-engineconn-manager-serverlinkis/lib/linkis-computation-governance/linkis-cg-engineconnmanager// todo fix bug. Failed to load com.webank.wedatasphere.linkis.engineconn.launch.EngineConnServer.
val publicDir = localDirsHandleService.getEngineConnPublicDir
val bmlResourceDir = Paths.get(publicDir).toFile.getPath
val permissions = Array(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE, PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ, PosixFilePermission.GROUP_WRITE, PosixFilePermission.GROUP_EXECUTE, PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_WRITE, PosixFilePermission.OTHERS_EXECUTE)
// 授权根路径
warn(s"Start changePermission ${ENGINECONN_ROOT_DIR}")
changePermission(ENGINECONN_ROOT_DIR, true, permissions)
private def changePermission(pathStr: String, isRecurisive: Boolean, permissions: Array[PosixFilePermission]): Unit = {
val path = Paths.get(pathStr)
if (!Files.exists(path)) {
warn(s"ChangePermission ${pathStr} not exists!")
return
}
try {
val perms = new util.HashSet[PosixFilePermission]()
for (permission <- permissions) {
perms.add(permission)
}
Files.setPosixFilePermissions(path, perms)
warn(s"Finish setPosixFilePermissions ${pathStr} ")
} catch {
case e: IOException =>
if (e.isInstanceOf[UserPrincipalNotFoundException]) {
return
}
e.printStackTrace()
}
// 当是目录的时候,递归设置文件权限
if (isRecurisive && Files.isDirectory(path)) {
try {
val ds = Files.newDirectoryStream(path)
import java.io.File
import scala.collection.JavaConversions._
for (subPath <- ds) {
warn(s"Recurisive setPosixFilePermissions ${subPath.getFileName} ")
changePermission(pathStr + File.separator + subPath.getFileName, true, permissions)
}
} catch {
case e: Exception => e.printStackTrace()
}
}
}4.1.2 Container exited with a non-zero exit code 1
- 问题描述
- 详细报错
2021-08-30 11:18:33.018 ERROR SubJob : 73 failed to execute task, code : 21304, reason : Task is Failed,errorMsg: errCode: 12003 ,desc: MatchError: LinkisException{errCode=30002, desc='failed to init engine .reason:SessionNotRunning: TezSession has already shutdown. Application application_1630056358308_0012 failed 2 times due to AM Container for appattempt_1630056358308_0012_000002 exited with exitCode: 1
yarn上application报错信息:`Error: Could not find or load main class org.apache.tez.dag.app.DAGAppMaster`
- 解决方案
tez-site.xmltez.lib.uris4.1.3 NoSuchMethodError
- 问题描述
hive.execution.mode- 详细报错
linkis控制台报错:return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask
yarn上application报错:
2021-08-30 16:04:35,564 [FATAL] [[email protected]] |yarn.YarnUncaughtExceptionHandler|: Thread Thread[[email protected],5,main] threw an Error. Shutting down now...
java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.elapsed(Ljava/util/concurrent/TimeUnit;)J
at org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor.run(JvmPauseMonitor.java:185)
at java.lang.Thread.run(Thread.java:748)- 解决方案
hive/libtez.lib.uris4.1.4 No LLAP Daemons are running
- 问题描述
hive.execution.mode- 详细报错
2021-08-31 18:05:11.421 ERROR [BDP-Default-Scheduler-Thread-3] SessionState 1130 printError - Status: Failed
Dag received [DAG_TERMINATE, SERVICE_PLUGIN_ERROR] in RUNNING state.
2021-08-31 18:05:11.421 ERROR [BDP-Default-Scheduler-Thread-3] SessionState 1130 printError - Dag received [DAG_TERMINATE, SERVICE_PLUGIN_ERROR] in RUNNING state.
Error reported by TaskScheduler [[2:LLAP]][SERVICE_UNAVAILABLE] No LLAP Daemons are running
2021-08-31 18:05:11.422 ERROR [BDP-Default-Scheduler-Thread-3] SessionState 1130 printError - Error reported by TaskScheduler [[2:LLAP]][SERVICE_UNAVAILABLE] No LLAP Daemons are running
Vertex killed, vertexName=Reducer 3, vertexId=vertex_1630056358308_0143_1_02, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1630056358308_0143_1_02 [Reducer 3] killed/failed due to:DAG_TERMINATED]
2021-08-31 18:05:11.422 ERROR [BDP-Default-Scheduler-Thread-3] SessionState 1130 printError - Vertex killed, vertexName=Reducer 3, vertexId=vertex_1630056358308_0143_1_02, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1630056358308_0143_1_02 [Reducer 3] killed/failed due to:DAG_TERMINATED]
Vertex killed, vertexName=Reducer 2, vertexId=vertex_1630056358308_0143_1_01, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1630056358308_0143_1_01 [Reducer 2] killed/failed due to:DAG_TERMINATED]
- 解决方案
4.2 Spark
4.2.1 ClassNotFoundException
- 问题描述
- 详细报错
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
68e048a8-c4b2-4bc2-a049-105064bea6dc:Caused by: java.lang.ClassNotFoundException: scala.Product$class
68e048a8-c4b2-4bc2-a049-105064bea6dc: at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
68e048a8-c4b2-4bc2-a049-105064bea6dc: at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
68e048a8-c4b2-4bc2-a049-105064bea6dc: ... 20 more
- 解决方案
scala.Product4.2.2 ClassCastException
- 问题描述
spark-sql- 详细报错
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
- 解决方案
spark-sqlspark/libspark-defaults.confspark.yarn.jarsspark.yarn.jars./spark-sql --master yarn --deploy-mode client4.3 Flink
4.3.1 method did not exist
- 问题描述
- 详细报错
***************************
APPLICATION FAILED TO START
***************************
Description:
An attempt was made to call a method that does not exist. The attempt was made from the following location:
org.springframework.boot.autoconfigure.mongo.MongoClientFactorySupport.applyUuidRepresentation(MongoClientFactorySupport.java:85)
The following method did not exist:
com.mongodb.MongoClientSettings$Builder.uuidRepresentation(Lorg/bson/UuidRepresentation;)Lcom/mongodb/MongoClientSettings$Builder;
- 解决方案
mongoclient5. 参考
- https://github.com/WeBankFinTech/Linkis-Doc
- https://github.com/WeBankFinTech/DataSphereStudio-Doc
- https://github.com/apache/tez
- Flink Table & SQL Connectors 官网 :https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/connectors/
6. 附录:WeDataSphere介绍

边栏推荐
- ECCV 2020 | star: pedestrian trajectory prediction model based on transformer (II)
- [paper] cascade r-cnn: delving into high quality object detection
- Vuforia for Unity 添加按钮实现模型放大缩小
- JS实现JSON数组合并和去重
- 最成功也最差劲的CEO去世,索尼还是走在他的老路上
- 各厂商的数据湖解决方案
- 期货网上开户安全吗?期货妙手机开户吗?
- 企业数字化转型该如何做?三个融合、三个转换
- Gaussdb (DWS) functions and supporting tools [Gauss is not a mathematician this time]
- pixi.js 平铺背景
猜你喜欢
Redis实现登录注册的示例代码

ECCV 2020 | star: pedestrian trajectory prediction model based on transformer (II)

^27定时器的相关问题

C#/VB.NET 在Word转PDF时生成目录书签

我把自动化测试学完后,一个月拿了2w+,才知道自动化测试这么吃香

List used by icomponent of unity dots

Qt msvc编译器中文问题解决,中文乱码以及无法正常编译

Jupyter lab learning notes

MH2103ACCT6国产软硬件兼容替代STM32F103CBT6

Performance monster on arm64: installation and performance test of API gateway Apache APIs IX on AWS graviton3
随机推荐
tmux(Terminal MultipleXer)命令使用
有关 cdn.jsdelivr.net 证书错误的一些想法
pixi.js 碰撞检测
Jupyter lab learning notes
Why do SQL statements use indexes but still slow queries?
I learned that automated testing is so popular after I got 2w+ in a month
MH2103ACCT6国产软硬件兼容替代STM32F103CBT6
Software testing career development direction, 6-year-old testing takes you out of confusion
Taoist friend, what have you done with redis?
[IV. demand analysis of several Internet enterprises based on domain name]
[v. reverse proxy and related configurations]
PHP云购源码附教程(源码)
我把自动化测试学完后,一个月拿了2w+,才知道自动化测试这么吃香
typecho 评论回复艾特评论人
小程序转盘抽奖组件
Blog recommended | bookkeeper - Apache pulsar high availability / strong consistency / low latency storage implementation
Performance monster on arm64: installation and performance test of API gateway Apache APIs IX on AWS graviton3
ARM64 上的性能怪兽:API 网关 Apache APISIX 在 AWS Graviton3 上的安装和性能测试
[II. Virtual host and domain name resolution]
[paper] cascade rpn: delving into high quality region proposal network with adaptive revolution