当前位置:网站首页>Flink1.15源码阅读flink-clients之GenericCLI、flinkYarnSessionCLI和DefaultCLI
Flink1.15源码阅读flink-clients之GenericCLI、flinkYarnSessionCLI和DefaultCLI
2022-07-29 18:52:00 【京河小蚁】
文章目录
CliFrontend#main
进入到main方法,从命令行入手
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
将三种命令行对象按顺序添加到List
将三种命令行对象按顺序添加到List并返回命令行集合对象
public static List<CustomCommandLine> loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
// 声明空集合
List<CustomCommandLine> customCommandLines = new ArrayList<>();
// 将GenericCLI添加到集合中
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
// 将flinkYarnSessionCLI添加到集合中
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
// 最后将默认客户端添加到集合中
customCommandLines.add(new DefaultCLI());
// 返回值集合
return customCommandLines;
}
CliFrontend#run
先看下该方法的全部代码
protected void run(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
// 这里获取活跃的命令行,着重看这里
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
// No need to set a jarFile path for Pyflink job.
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}
获取活跃的命令行
// 摘自上runApplication 方法
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
点击进去validateAndGetActiveCommandLine方法
/** * Gets the custom command-line for the arguments. * * @param commandLine The input to the command-line. * @return custom command-line which is active (may only be one at a time) */
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
从上面注释中,很容易看出该方法是干什么的,获取活跃的命令行客户端;
for循环中顺序的取出之前存到集合中的三个客户端,根据cli.isActive(commandLine)这个方法检查,第一个活跃的客户端并立即返回,因为默认的客户端active总是true,从下面代码可以看出
1. GenericCLI#isActive
@Override
public boolean isActive(CommandLine commandLine) {
return configuration.getOptional(DeploymentOptions.TARGET).isPresent()
|| commandLine.hasOption(executorOption.getOpt())
|| commandLine.hasOption(targetOption.getOpt());
}
configuration.getOptional(DeploymentOptions.TARGET).isPresent() TARGET是否存在
executorOption.getOpt()

该配置是Generic CLItargetOption.getOpt()

上面三个条件满足一个即可。
2. flinkYarnSessionCLI#isActive

3. DefaultCLI#isActive
DefaultCLI#isActive总是true,所以默认客户端总是兜底的
@Override
public boolean isActive(CommandLine commandLine) {
// always active because we can try to read a JobManager address from the config
return true;
}
边栏推荐
猜你喜欢

Win11任务栏太宽了怎么变窄?Win11任务栏宽度调整方法
![Chapter 02 MySQL Data Directory [1. MySQL Architecture] [MySQL Advanced]](/img/be/e9e910b29f51100f18a063c48c7db0.png)
Chapter 02 MySQL Data Directory [1. MySQL Architecture] [MySQL Advanced]

大疆MID 360

MarkBERT

31个!Golang常用工具来啦(建议收藏)

TDengine 落地协鑫能科,数百亿数据压缩至 600GB

H264码流RTP封装方式详解

MySQL 中的反斜杠 \\,我上当了

Embedded Development: Embedded Fundamentals - Software Error Classification

总数据量超万亿行,玉溪卷烟厂通过正确选择时序数据库轻松应对
随机推荐
R语言时间序列数据提取:使用xts包的first函数提取时间序列中最前面两周的数据(first 2 week)
数字孪生万物可视 | 联接现实世界与数字空间
Answer these 3 interview questions correctly, and the salary will go up by 20K
ECCV 2022 | AirDet:无需微调的小样本目标检测方法
error TS1219: Experimental support for decorators解决
真·摸鱼带师:程序员小哥每天工作10分钟年薪57万,我破防了...
Win11网络不稳定怎么办?Win11连接wifi频繁掉线的解决方法
Gesture password unlock WeChat applet project source code
Low code of the trilogy
TDengine 助力西门子轻量级数字化解决方案
High-speed passive link impedance matching routine
C language advanced enumeration and joint
线程池 ThreadPoolExecutor 详解
C # CLI (common language infrastructure)
我用两行代码实现了一个数据库!
Small programs use npm packages
exdark数据集转yolo格式(仅供参考)
Typescript使用修饰器混合方法到类
【技术课堂】从批到流:pull or not pull, that's a question
峰会(暑假每日一题 8)