当前位置:网站首页>Flink Yarn Per Job - 启动AM
Flink Yarn Per Job - 启动AM
2022-08-03 09:01:00 【hyunbar】

启动AM
YarnClusterDescriptor
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
...
/*TODO 开始启动AM*/
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
...
}
private ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception { // ------------------ Initialize the file systems ------------------------- 1.0 初始化、创建 Hadoop的 FileSystem org.apache.flink.core.fs.FileSystem.initialize( configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); final FileSystem fs = FileSystem.get(yarnConfiguration); ... ... ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); final List<Path> providedLibDirs = Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration); 2.0 Yarn应用的文件上传器:FS、对应的HDFS路径 2.0 用来上传:用户jar包、flink的依赖、flink的配置文件 2.0 直接 fileUploader.close()
final YarnApplicationFileUploader fileUploader = YarnApplicationFileUploader.from( fs, getStagingDir(fs), providedLibDirs, appContext.getApplicationId(), getFileReplication());
... ... // Set-up ApplicationSubmissionContext for the application final ApplicationId appId = appContext.getApplicationId(); ... ... configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); 3.0 高可用配置:重试次数,默认2次 3.0 DEFAULT_RM_AM_MAX_ATTEMPTS=2 if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) { // activate re-execution of failed applications appContext.setMaxAppAttempts( configuration.getInteger( YarnConfigOptions.APPLICATION_ATTEMPTS.key(), YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); activateHighAvailabilitySupport(appContext); } else { // set number of application retries to 1 in the default case 3.1 不是高可用重试次数为1次 appContext.setMaxAppAttempts( configuration.getInteger( YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1)); } 4.0 添加用户jar包 final Set<Path> userJarFiles = new HashSet<>(); if (jobGraph != null) { userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet())); } final List<URI> jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create); if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) { userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet())); } ... ... // upload and register ship-only files // Plugin files only need to be shipped and should not be added to classpath. plugins目录下的文件 if (providedLibDirs == null || providedLibDirs.isEmpty()) { Set<File> shipOnlyFiles = new HashSet<>(); addPluginsFoldersToShipFiles(shipOnlyFiles); fileUploader.registerMultipleLocalResources( shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), Path.CUR_DIR, LocalResourceType.FILE); } if (!shipArchives.isEmpty()) { fileUploader.registerMultipleLocalResources( shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), Path.CUR_DIR, LocalResourceType.ARCHIVE); } 5.0 Upload and register user jars 上传和注册用户jar包 final List<String> userClassPaths = fileUploader.registerMultipleLocalResources( userJarFiles, userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR : Path.CUR_DIR, LocalResourceType.FILE); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { systemClassPaths.addAll(userClassPaths); } // normalize classpath by sorting Collections.sort(systemClassPaths); Collections.sort(userClassPaths); // classpath assembler StringBuilder classPathBuilder = new StringBuilder(); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } for (String classPath : systemClassPaths) { classPathBuilder.append(classPath).append(File.pathSeparator); } // Setup jar for ApplicationMaster final YarnLocalResourceDescriptor localResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath); classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator); // write job graph to tmp file and add it to local resource // 将jobGraph写入到 临时文件,并且添加到本地资源 // TODO: server use user main method to generate job graph // 该服务用用户类的main方法生成JobGraph if (jobGraph != null) { File tmpJobGraphFile = null; try { tmpJobGraphFile = File.createTempFile(appId.toString(), null); try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile); ObjectOutputStream obOutput = new ObjectOutputStream(output)) { obOutput.writeObject(jobGraph); } final String jobGraphFilename = "job.graph"; configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename); fileUploader.registerSingleLocalResource( jobGraphFilename, new Path(tmpJobGraphFile.toURI()), "", LocalResourceType.FILE, true, false); classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); } catch (Exception e) { LOG.warn("Add job graph to local resource fail."); throw e; } finally { if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath()); } } } 6.0 Upload the flink configuration 6.0 write out configuration file // 上传Flink的配置文件 - flink-conf.yaml File tmpConfigurationFile = null; try { tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile); String flinkConfigKey = "flink-conf.yaml"; fileUploader.registerSingleLocalResource( flinkConfigKey, new Path(tmpConfigurationFile.getAbsolutePath()), "", LocalResourceType.FILE, true, true); classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); } finally { if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath()); } } if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } ... ... 7.0 jobmanager内存配置 TOTAL_PROCESS_MEMORY=jobmanager.memory.process.size final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY); final ContainerLaunchContext amContainer = setupApplicationMasterContainer( yarnClusterEntrypoint, hasKrb5, processSpec);
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); 8.0 关闭上传器 fileUploader.close(); // Setup CLASSPATH and environment variables for ApplicationMaster 9.0 创建Map,用来存储 AM的环境变量和类路径 final Map<String, String> appMasterEnv = new HashMap<>(); 9.1 set user specified app master environment variables appMasterEnv.putAll( ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration)); 9.2 set Flink app class path appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString()); 9.3 set Flink on YARN internal configuration values appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString()); appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList())); appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace()); appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, fileUploader.getApplicationDir().toUri().toString()); // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); if (localizedKeytabPath != null) { appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath); String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal); if (remotePathKeytab != null) { appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString()); } } ... ... 10.0 将之前封装的 Map(AM的环境信息、类路径),设置到容器里 amContainer.setEnvironment(appMasterEnv); // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); capability.setMemory(clusterSpecification.getMasterMemoryMB()); capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES)); final String customApplicationName = customName != null ? customName : applicationName; appContext.setApplicationName(customApplicationName); appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink"); appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); // Set priority for application int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY); if (priorityNum >= 0) { Priority priority = Priority.newInstance(priorityNum); appContext.setPriority(priority); } if (yarnQueue != null) { appContext.setQueue(yarnQueue); } setApplicationNodeLabel(appContext); setApplicationTags(appContext); // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir()); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); LOG.info("Submitting application master " + appId); 11.0 可以提交应用 yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); final long startTime = System.currentTimeMillis(); ApplicationReport report; YarnApplicationState lastAppState = YarnApplicationState.NEW; ... ... } // since deployment was successful, remove the hook ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); return report; } |
startAppMaster主要功能:
初始化、创建 Hadoop的 FileSystem
Yarn应用的文件上传器:FS对应的HDFS路径;用来上传:用户jar包、flink的依赖、flink的配置文件
高可用配置:重试次数,默认2次(DEFAULT_RM_AM_MAX_ATTEMPTS=2)。不是高可用重试次数为1次
添加用户jar包, plugins目录下的文件
上传和注册用户jar包
将jobGraph写入到 临时文件,并且添加到本地资源
该服务用用户类的main方法生成JobGraph
上传Flink的配置文件 - flink-conf.yaml
jobmanager内存配置TOTAL_PROCESS_MEMORY=jobmanager.memory.process.size
创建Map,用来存储 AM的环境变量和类路径
将之前封装的 Map(AM的环境信息、类路径),设置到容器里
提交应用
JM的内存配置
YarnClusterDescriptor
final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration,
JobManagerOptions.TOTAL_PROCESS_MEMORY);
JobManagerProcessUtils
public static JobManagerProcessSpec processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
Configuration config,
ConfigOption<MemorySize> newOptionToInterpretLegacyHeap) {
try {
return processSpecFromConfig(
getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
config,
newOptionToInterpretLegacyHeap));
} catch (IllegalConfigurationException e) {
throw new IllegalConfigurationException("JobManager memory configuration failed: " + e.getMessage(), e);
}
}
static JobManagerProcessSpec processSpecFromConfig(Configuration config) {
return createMemoryProcessSpec(PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
}
private static JobManagerProcessSpec createMemoryProcessSpec(
CommonProcessMemorySpec<JobManagerFlinkMemory> processMemory) {
return new JobManagerProcessSpec(processMemory.getFlinkMemory(), processMemory.getJvmMetaspaceAndOverhead());
}
JobManagerProcessSpec extends CommonProcessMemorySpe
/** |
CommonProcessMemorySpec
父类构造器为flinkMemory和jvmMetaspaceAndOverhead赋值
public class CommonProcessMemorySpec<FM extends FlinkMemory> implements ProcessMemorySpec {
private static final long serialVersionUID = 1L;
private final FM flinkMemory;
private final JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
protected CommonProcessMemorySpec(FM flinkMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
this.flinkMemory = flinkMemory;
this.jvmMetaspaceAndOverhead = jvmMetaspaceAndOverhead;
}
}
amContainer配置
YarnClusterDescriptor
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasKrb5,
processSpec);
YarnClusterDescriptor
ContainerLaunchContext setupApplicationMasterContainer( |

边栏推荐
猜你喜欢
随机推荐
行业 SaaS 微服务稳定性保障实战
多线程下的单例模式
What are pseudo-classes and pseudo-elements?The difference between pseudo-classes and pseudo-elements
English Grammar - Adverbial Clauses
数仓4.0(二)------ 业务数据采集平台
10分钟带你入门chrome(谷歌)浏览器插件开发
SQL Daily Practice (Nioke New Question Bank) - Day 5: Advanced Query
Alibaba Cloud SMS Sending
pytorch one-hot 小技巧
CSP-S2019 Day2
SQL每日一练(牛客新题库)——第5天:高级查询
行业洞察 | 如何更好的实现与虚拟人的互动体验?
The window of the chosen data flow
MySQL-存储过程-函数-
The display of the article list and the basics of creating articles and article details
Eject stubborn hard drives with diskpart's offline command
php中去重二维数组
Qt 下拉复选框(MultiSelectComboBox)(一) 实现下拉框多选,搜索下拉框内容
浅析什么是伪类和伪元素?伪类和伪元素的区别解析
【LeetCode】226. Flip the binary tree









