当前位置:网站首页>Flink1.15源码阅读flink-clients之GenericCLI、flinkYarnSessionCLI和DefaultCLI
Flink1.15源码阅读flink-clients之GenericCLI、flinkYarnSessionCLI和DefaultCLI
2022-07-29 18:52:00 【京河小蚁】
文章目录
CliFrontend#main
进入到main方法,从命令行入手
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
将三种命令行对象按顺序添加到List
将三种命令行对象按顺序添加到List并返回命令行集合对象
public static List<CustomCommandLine> loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
// 声明空集合
List<CustomCommandLine> customCommandLines = new ArrayList<>();
// 将GenericCLI添加到集合中
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
// 将flinkYarnSessionCLI添加到集合中
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
// 最后将默认客户端添加到集合中
customCommandLines.add(new DefaultCLI());
// 返回值集合
return customCommandLines;
}
CliFrontend#run
先看下该方法的全部代码
protected void run(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
// 这里获取活跃的命令行,着重看这里
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
// No need to set a jarFile path for Pyflink job.
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}
获取活跃的命令行
// 摘自上runApplication 方法
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
点击进去validateAndGetActiveCommandLine方法
/** * Gets the custom command-line for the arguments. * * @param commandLine The input to the command-line. * @return custom command-line which is active (may only be one at a time) */
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
从上面注释中,很容易看出该方法是干什么的,获取活跃的命令行客户端;
for循环中顺序的取出之前存到集合中的三个客户端,根据cli.isActive(commandLine)这个方法检查,第一个活跃的客户端并立即返回,因为默认的客户端active总是true,从下面代码可以看出
1. GenericCLI#isActive
@Override
public boolean isActive(CommandLine commandLine) {
return configuration.getOptional(DeploymentOptions.TARGET).isPresent()
|| commandLine.hasOption(executorOption.getOpt())
|| commandLine.hasOption(targetOption.getOpt());
}
configuration.getOptional(DeploymentOptions.TARGET).isPresent() TARGET是否存在
executorOption.getOpt()

该配置是Generic CLItargetOption.getOpt()

上面三个条件满足一个即可。
2. flinkYarnSessionCLI#isActive

3. DefaultCLI#isActive
DefaultCLI#isActive总是true,所以默认客户端总是兜底的
@Override
public boolean isActive(CommandLine commandLine) {
// always active because we can try to read a JobManager address from the config
return true;
}
边栏推荐
猜你喜欢

2.5w字 + 36 张图爆肝操作系统面试题 学不吐你

centos8安装redis

MarkBERT

C# CLI(公共语言基础结构)

PromptBERT: Improving BERT Sentence Embeddings with Prompts

测试基础:Nosql数据库之Redis

MySQL 中的反斜杠 \\,我上当了

低代码搭建高效管理房屋检测系统案例分析

Function declaration and scope

First-line big factory software test interview questions and answer analysis, the strongest version of 2022...
随机推荐
牛客网刷题记录 || 指针
低代码三部曲之未来
搭建自己的以图搜图系统 (一):10 行代码以图搜图
MarkBERT
Typescript使用修饰器混合方法到类
etcd实现大规模服务治理应用实战
centos8安装redis
我用两行代码实现了一个数据库!
centos8安装mysql8.0.28
H265码流RTP封装方式详解
C#_OpenCV使用相机
【中标麒麟系统Your trial is EXPIRED and no VALID licens 关闭弹窗】
pytorch构建YOLOV7网络结构
pkg_resources.DistributionNotFound: The 'pip==1.4' distribution was not found and is required
无人驾驶技术有什么优点,人工驾驶的优缺点英文
R语言时间序列数据提取:使用xts包的last函数提取时间序列中最后面两周的数据(last 2 week)
一些投资的底线
word文档里插入图片显示不完整,只显示一半,怎么处理?
Canal实现Mysql数据增量同步更新至Mysql/Redis
2.5w字 + 36 张图爆肝操作系统面试题 学不吐你