当前位置:网站首页>Flink Yarn Per Job - 启动AM
Flink Yarn Per Job - 启动AM
2022-08-03 09:01:00 【hyunbar】

启动AM
YarnClusterDescriptor
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
...
/*TODO 开始启动AM*/
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
...
}
private ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception { // ------------------ Initialize the file systems ------------------------- 1.0 初始化、创建 Hadoop的 FileSystem org.apache.flink.core.fs.FileSystem.initialize( configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); final FileSystem fs = FileSystem.get(yarnConfiguration); ... ... ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); final List<Path> providedLibDirs = Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration); 2.0 Yarn应用的文件上传器:FS、对应的HDFS路径 2.0 用来上传:用户jar包、flink的依赖、flink的配置文件 2.0 直接 fileUploader.close()
final YarnApplicationFileUploader fileUploader = YarnApplicationFileUploader.from( fs, getStagingDir(fs), providedLibDirs, appContext.getApplicationId(), getFileReplication());
... ... // Set-up ApplicationSubmissionContext for the application final ApplicationId appId = appContext.getApplicationId(); ... ... configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); 3.0 高可用配置:重试次数,默认2次 3.0 DEFAULT_RM_AM_MAX_ATTEMPTS=2 if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) { // activate re-execution of failed applications appContext.setMaxAppAttempts( configuration.getInteger( YarnConfigOptions.APPLICATION_ATTEMPTS.key(), YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); activateHighAvailabilitySupport(appContext); } else { // set number of application retries to 1 in the default case 3.1 不是高可用重试次数为1次 appContext.setMaxAppAttempts( configuration.getInteger( YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1)); } 4.0 添加用户jar包 final Set<Path> userJarFiles = new HashSet<>(); if (jobGraph != null) { userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet())); } final List<URI> jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create); if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) { userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet())); } ... ... // upload and register ship-only files // Plugin files only need to be shipped and should not be added to classpath. plugins目录下的文件 if (providedLibDirs == null || providedLibDirs.isEmpty()) { Set<File> shipOnlyFiles = new HashSet<>(); addPluginsFoldersToShipFiles(shipOnlyFiles); fileUploader.registerMultipleLocalResources( shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), Path.CUR_DIR, LocalResourceType.FILE); } if (!shipArchives.isEmpty()) { fileUploader.registerMultipleLocalResources( shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), Path.CUR_DIR, LocalResourceType.ARCHIVE); } 5.0 Upload and register user jars 上传和注册用户jar包 final List<String> userClassPaths = fileUploader.registerMultipleLocalResources( userJarFiles, userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR : Path.CUR_DIR, LocalResourceType.FILE); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { systemClassPaths.addAll(userClassPaths); } // normalize classpath by sorting Collections.sort(systemClassPaths); Collections.sort(userClassPaths); // classpath assembler StringBuilder classPathBuilder = new StringBuilder(); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } for (String classPath : systemClassPaths) { classPathBuilder.append(classPath).append(File.pathSeparator); } // Setup jar for ApplicationMaster final YarnLocalResourceDescriptor localResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath); classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator); // write job graph to tmp file and add it to local resource // 将jobGraph写入到 临时文件,并且添加到本地资源 // TODO: server use user main method to generate job graph // 该服务用用户类的main方法生成JobGraph if (jobGraph != null) { File tmpJobGraphFile = null; try { tmpJobGraphFile = File.createTempFile(appId.toString(), null); try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile); ObjectOutputStream obOutput = new ObjectOutputStream(output)) { obOutput.writeObject(jobGraph); } final String jobGraphFilename = "job.graph"; configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename); fileUploader.registerSingleLocalResource( jobGraphFilename, new Path(tmpJobGraphFile.toURI()), "", LocalResourceType.FILE, true, false); classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); } catch (Exception e) { LOG.warn("Add job graph to local resource fail."); throw e; } finally { if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath()); } } } 6.0 Upload the flink configuration 6.0 write out configuration file // 上传Flink的配置文件 - flink-conf.yaml File tmpConfigurationFile = null; try { tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile); String flinkConfigKey = "flink-conf.yaml"; fileUploader.registerSingleLocalResource( flinkConfigKey, new Path(tmpConfigurationFile.getAbsolutePath()), "", LocalResourceType.FILE, true, true); classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); } finally { if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath()); } } if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } ... ... 7.0 jobmanager内存配置 TOTAL_PROCESS_MEMORY=jobmanager.memory.process.size final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY); final ContainerLaunchContext amContainer = setupApplicationMasterContainer( yarnClusterEntrypoint, hasKrb5, processSpec);
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); 8.0 关闭上传器 fileUploader.close(); // Setup CLASSPATH and environment variables for ApplicationMaster 9.0 创建Map,用来存储 AM的环境变量和类路径 final Map<String, String> appMasterEnv = new HashMap<>(); 9.1 set user specified app master environment variables appMasterEnv.putAll( ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration)); 9.2 set Flink app class path appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString()); 9.3 set Flink on YARN internal configuration values appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString()); appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList())); appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace()); appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, fileUploader.getApplicationDir().toUri().toString()); // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); if (localizedKeytabPath != null) { appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath); String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal); if (remotePathKeytab != null) { appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString()); } } ... ... 10.0 将之前封装的 Map(AM的环境信息、类路径),设置到容器里 amContainer.setEnvironment(appMasterEnv); // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); capability.setMemory(clusterSpecification.getMasterMemoryMB()); capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES)); final String customApplicationName = customName != null ? customName : applicationName; appContext.setApplicationName(customApplicationName); appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink"); appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); // Set priority for application int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY); if (priorityNum >= 0) { Priority priority = Priority.newInstance(priorityNum); appContext.setPriority(priority); } if (yarnQueue != null) { appContext.setQueue(yarnQueue); } setApplicationNodeLabel(appContext); setApplicationTags(appContext); // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir()); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); LOG.info("Submitting application master " + appId); 11.0 可以提交应用 yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); final long startTime = System.currentTimeMillis(); ApplicationReport report; YarnApplicationState lastAppState = YarnApplicationState.NEW; ... ... } // since deployment was successful, remove the hook ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); return report; } |
startAppMaster主要功能:
初始化、创建 Hadoop的 FileSystem
Yarn应用的文件上传器:FS对应的HDFS路径;用来上传:用户jar包、flink的依赖、flink的配置文件
高可用配置:重试次数,默认2次(DEFAULT_RM_AM_MAX_ATTEMPTS=2)。不是高可用重试次数为1次
添加用户jar包, plugins目录下的文件
上传和注册用户jar包
将jobGraph写入到 临时文件,并且添加到本地资源
该服务用用户类的main方法生成JobGraph
上传Flink的配置文件 - flink-conf.yaml
jobmanager内存配置TOTAL_PROCESS_MEMORY=jobmanager.memory.process.size
创建Map,用来存储 AM的环境变量和类路径
将之前封装的 Map(AM的环境信息、类路径),设置到容器里
提交应用
JM的内存配置
YarnClusterDescriptor
final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration,
JobManagerOptions.TOTAL_PROCESS_MEMORY);
JobManagerProcessUtils
public static JobManagerProcessSpec processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
Configuration config,
ConfigOption<MemorySize> newOptionToInterpretLegacyHeap) {
try {
return processSpecFromConfig(
getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
config,
newOptionToInterpretLegacyHeap));
} catch (IllegalConfigurationException e) {
throw new IllegalConfigurationException("JobManager memory configuration failed: " + e.getMessage(), e);
}
}
static JobManagerProcessSpec processSpecFromConfig(Configuration config) {
return createMemoryProcessSpec(PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
}
private static JobManagerProcessSpec createMemoryProcessSpec(
CommonProcessMemorySpec<JobManagerFlinkMemory> processMemory) {
return new JobManagerProcessSpec(processMemory.getFlinkMemory(), processMemory.getJvmMetaspaceAndOverhead());
}
JobManagerProcessSpec extends CommonProcessMemorySpe
/** |
CommonProcessMemorySpec
父类构造器为flinkMemory和jvmMetaspaceAndOverhead赋值
public class CommonProcessMemorySpec<FM extends FlinkMemory> implements ProcessMemorySpec {
private static final long serialVersionUID = 1L;
private final FM flinkMemory;
private final JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
protected CommonProcessMemorySpec(FM flinkMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
this.flinkMemory = flinkMemory;
this.jvmMetaspaceAndOverhead = jvmMetaspaceAndOverhead;
}
}
amContainer配置
YarnClusterDescriptor
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasKrb5,
processSpec);
YarnClusterDescriptor
ContainerLaunchContext setupApplicationMasterContainer( |

边栏推荐
猜你喜欢
随机推荐
关于Unity,Laya学习,第一步加载Unity加载场景
【LeetCode】622. Design Circular Queue
Using pipreqs export requirements needed for the project. TXT (rather than the whole environment)
Redis分布式锁
Unity关于编辑器扩展自定义标签,方便扩展Inspector
Guava的Service
多线程下的单例模式
WPS EXCEL 筛选指定长度的文本 内容 字符串
AcWing 3391. 今年的第几天?(简单题)
Qt 下拉复选框(MultiSelectComboBox)(一) 实现下拉框多选,搜索下拉框内容
Laya中关于摄像机跟随人物移动或者点击人物碰撞器触发事件的Demo
【论文笔记】基于动作空间划分的MAXQ自动分层方法
0day_Topsec上网行为管理RCE
Redis集群概念与搭建
并发之ReentrantLock
uni-app 顶部选项卡吸附效果 demo(整理)
STP生成树选举结果查看及验证
【LeetCode】zj面试-把字符串转换成整数
timestamp
The Transformer, BERT, GPT paper intensive reading notes









