当前位置:网站首页>Flink1.15源码阅读flink-clients之GenericCLI、flinkYarnSessionCLI和DefaultCLI
Flink1.15源码阅读flink-clients之GenericCLI、flinkYarnSessionCLI和DefaultCLI
2022-07-29 18:52:00 【京河小蚁】
文章目录
CliFrontend#main
进入到main方法,从命令行入手
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
将三种命令行对象按顺序添加到List
将三种命令行对象按顺序添加到List并返回命令行集合对象
public static List<CustomCommandLine> loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
// 声明空集合
List<CustomCommandLine> customCommandLines = new ArrayList<>();
// 将GenericCLI添加到集合中
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
// 将flinkYarnSessionCLI添加到集合中
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
// 最后将默认客户端添加到集合中
customCommandLines.add(new DefaultCLI());
// 返回值集合
return customCommandLines;
}
CliFrontend#run
先看下该方法的全部代码
protected void run(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
// 这里获取活跃的命令行,着重看这里
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
// No need to set a jarFile path for Pyflink job.
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}
获取活跃的命令行
// 摘自上runApplication 方法
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
点击进去validateAndGetActiveCommandLine方法
/** * Gets the custom command-line for the arguments. * * @param commandLine The input to the command-line. * @return custom command-line which is active (may only be one at a time) */
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
从上面注释中,很容易看出该方法是干什么的,获取活跃的命令行客户端;
for循环中顺序的取出之前存到集合中的三个客户端,根据cli.isActive(commandLine)这个方法检查,第一个活跃的客户端并立即返回,因为默认的客户端active总是true,从下面代码可以看出
1. GenericCLI#isActive
@Override
public boolean isActive(CommandLine commandLine) {
return configuration.getOptional(DeploymentOptions.TARGET).isPresent()
|| commandLine.hasOption(executorOption.getOpt())
|| commandLine.hasOption(targetOption.getOpt());
}
configuration.getOptional(DeploymentOptions.TARGET).isPresent() TARGET是否存在
executorOption.getOpt()

该配置是Generic CLItargetOption.getOpt()

上面三个条件满足一个即可。
2. flinkYarnSessionCLI#isActive

3. DefaultCLI#isActive
DefaultCLI#isActive总是true,所以默认客户端总是兜底的
@Override
public boolean isActive(CommandLine commandLine) {
// always active because we can try to read a JobManager address from the config
return true;
}
边栏推荐
猜你喜欢
随机推荐
MySQL 中的反斜杠 \\,我上当了
2022暑假 动态规划总结
从零在AutoDL调试一份目标检测代码
软件测试面试屡屡失败,面试官总是说逻辑思维混乱,怎么办?
Setting right:0 after sticky positioning does not take effect
centos8安装redis
H264码流RTP封装方式详解
h264和h265视频流SDP描述详解
经验分享|编写简单易用的在线产品手册小妙招
MarkBERT
Small application components
R语言使用xts包表示时间序列数据(time series data)
活动回顾 | 大咖云集“开源安全治理模型和工具”线上研讨会
OpenCV - 图像二值化处理 腐蚀膨胀 边缘检测 轮廓识别
function arguments
大疆MID 360
要卖课、要带货,知识付费系统帮你一步搞定!
大中型网站列表页翻页过多怎么优化?
ECCV 2022 | AirDet:无需微调的小样本目标检测方法
Win11网络不稳定怎么办?Win11连接wifi频繁掉线的解决方法









