当前位置:网站首页>Flink Yarn Per Job - CliFrontend
Flink Yarn Per Job - CliFrontend
2022-08-01 23:43:00 【hyunbar】

Per-Job 新老版本启动方法
- 老版本(<=1.10)
flink run -m yarn-cluster -c xxx xxx.jar
- 新版本(>=1.11)
flink run -t yarn-per-job -c xxx xxx.jar
Per-Job 启动的三个进程
CliFrontend
参数解析
封装CommandLine
封装配置
执行用户代码 execute()
生成StreamGraph
Executor:生成JobGraph
集群描述器:上传jar包、配置, 封装提交给yarn的命令
yarnclient提交应用
YarnJobClusterEntryPoint:AM执行的入口类
Dispatcher 的创建和启动
ResourceManager的创建、启动(slotmanager真正的管理资源,向yarn申请资源)
Dispatcher 启动 JobMaster。生成ExecutionGraph(slotpool,真正的发送请求)
slotpool向slotmanger申请资源,slotmanger向yarn申请资源(启动节点)
YarnTaskExecutorRunner:Yarn模式下的TaskManager的入口类
启动 TaskExecutor
向ResourceManager注册slot
ResouceManger分配slot
TaskExecutor接收到分配的指令,提供offset给JobMaster
JobMaster提交任务给TaskExecutor去执行
程序起点

flink-1.12.0\flink-dist\src\main\flink-bin\bin\flink
#!/usr/bin/env bash
target="$0"
iteration=0
while [ -L "$target" ]; do
if [ "$iteration" -gt 100 ]; then
echo "Cannot resolve path: You have a cyclic symlink in $target."
break
fi
ls=`ls -ld -- "$target"`
target=`expr "$ls" : '.* -> \(.*\)$'`
iteration=$((iteration + 1))
done
# Convert relative path to absolute path
bin=`dirname "$target"`
# get flink config
. "$bin"/config.sh
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
CC_CLASSPATH=`constructFlinkClassPath`
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "[email protected]"
1)入口类(CliFrontend)
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}"
-classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
org.apache.flink.client.cli.CliFrontend "[email protected]"
2)config.sh 相关配置
. "$bin"/config.sh
JAVA_RUN=java
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
JAVA_RUN=java
else
if [[ -d $JAVA_HOME ]]; then
JAVA_RUN=$JAVA_HOME/bin/java
else
JAVA_RUN=java
fi
fi
JAVA_ARGS=conf/flink-conf.yaml
# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
if [ -z "${JVM_ARGS}" ]; then
JVM_ARGS=""
fi
HADOOP_CONF_DIR=“$HADOOP_HOME/etc/hadoop”
# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
if [ -z "$HADOOP_CONF_DIR" ]; then
if [ -n "$HADOOP_HOME" ]; then
# HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
if [ -d "$HADOOP_HOME/conf" ]; then
# It's Hadoop 1.x
HADOOP_CONF_DIR="$HADOOP_HOME/conf"
fi
if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
# It's Hadoop 2.2+
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
fi
fi
fi
# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then
if [ -d "/etc/hadoop/conf" ]; then
echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."
HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
INTERNAL_HADOOP_CLASSPATHS
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
if [ -n "${HBASE_CONF_DIR}" ]; then
INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
fi
java -cp 就会开启 JVM 虚拟机,在虚拟机上开启 CliFrontend 进程,然后开始执行main 方法
说明:java -cp 和 -classpath 一样,是指定类运行所依赖其他类的路径。
3)运行入口类 CliFrontend
shift + shift:org.apache.flink.client.cli.CliFrontend
ctrl + F12:main
/**
* Submits the job based on the arguments.
*/
public static void main(final String[] args) {
...
}
}

边栏推荐
- nodejs--process
- 云原生DevOps环境搭建
- Flink学习第四天——完成第一个Flink 流批一体案例
- Access the selected node in the console
- Classical Literature Reading--DLO
- 经典文献阅读之--DLO
- 分享一份接口测试项目(非常值得练手)
- 避免使用 <b>、<i>、<s> 和 <u> 标签
- Sql之各种Join
- [LeetCode304 Weekly Competition] Two questions about the base ring tree 6134. Find the closest node to the given two nodes, 6135. The longest cycle in the graph
猜你喜欢
![[LeetCode304周赛] 两道关于基环树的题 6134. 找到离给定两个节点最近的节点,6135. 图中的最长环](/img/63/16de443caf28644d79dc6e6889e5dd.png)
[LeetCode304周赛] 两道关于基环树的题 6134. 找到离给定两个节点最近的节点,6135. 图中的最长环

C language - branch statement and loop statement

访问控制台中的选定节点

架构基本概念和架构本质

在CDH的hue上的oozie出现,提交 Coordinator My Schedule 时出错

Secondary Vocational Network Security Competition B7 Competition Deployment Process

字节跳动面试官:请你实现一个大文件上传和断点续传

Data Organization --- Chapter 5 Trees and Binary Trees --- The Concept of Binary Trees --- Application Questions

chrome复制一张图片的base64数据

2022 6th Strong Net Cup Part WP
随机推荐
几道关于golang并发的面试题
Programmer is still short of objects? A new one is enough
UML diagram of soft skills
软件测试之移动APP安全测试简析,北京第三方软件检测机构分享
计算两点之间的中点
[LeetCode304 Weekly Competition] Two questions about the base ring tree 6134. Find the closest node to the given two nodes, 6135. The longest cycle in the graph
Chapter 11 Working with Dates and Times
月薪12K,蝶变向新,勇往直前—她通过转行测试实现月薪翻倍~
基于JAX的激活函数、softmax函数和交叉熵函数
Appears in oozie on CDH's hue, error submitting Coordinator My Schedule
程序员还差对象?new一个就行了
Quartus uses tcl files to quickly configure pins
Flink学习第三天——一文带你了解什么是Flink流?
字节跳动面试官:请你实现一个大文件上传和断点续传
numpy.hstack
LocalDateTime转为Date类型
Chapter 12 End-User Task As Shell Scripts
Flink学习第五天——Flink可视化控制台依赖配置和界面介绍
Dynamic Scene Deblurring with Parameter Selective Sharing and Nested Skip Connections
chrome copies the base64 data of an image